Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from gradio_chessboard import Chessboard | |
| import chess | |
| import chess.engine | |
| import pandas as pd | |
| import os | |
| import uuid | |
| from openai import OpenAI | |
| import time | |
| SYSTEM_KEY = os.getenv("OPENAI_API_KEY") | |
| STOCKFISH_PATH = "/usr/games/stockfish" | |
| def get_client(user_key): | |
| key = user_key.strip() if user_key else SYSTEM_KEY | |
| if not key: return None | |
| return OpenAI(api_key=key) | |
| def _load_lichess_openings(path_prefix="/app/data/lichess_openings/dist/"): | |
| try: | |
| files = [f"{path_prefix}{vol}.tsv" for vol in ("a", "b", "c", "d", "e")] | |
| dfs = [] | |
| for fn in files: | |
| if os.path.exists(fn): | |
| df = pd.read_csv(fn, sep="\t", usecols=["eco", "name", "pgn", "uci", "epd"]) | |
| dfs.append(df) | |
| if not dfs: return pd.DataFrame() | |
| return pd.concat(dfs, ignore_index=True) | |
| except: | |
| return pd.DataFrame() | |
| OPENINGS_DB = _load_lichess_openings() | |
| def analyze_tactics(board): | |
| fen = board.fen() | |
| pins = [] | |
| turn = board.turn | |
| for sq in chess.SQUARES: | |
| piece = board.piece_at(sq) | |
| if piece and piece.color == turn: | |
| if board.is_pinned(turn, sq): | |
| pins.append(chess.square_name(sq)) | |
| check = board.is_check() | |
| values = {chess.PAWN: 1, chess.KNIGHT: 3, chess.BISHOP: 3, chess.ROOK: 5, chess.QUEEN: 9} | |
| w_mat = sum(len(board.pieces(pt, chess.WHITE)) * val for pt, val in values.items()) | |
| b_mat = sum(len(board.pieces(pt, chess.BLACK)) * val for pt, val in values.items()) | |
| analysis = [] | |
| if check: analysis.append("KING IN CHECK.") | |
| if pins: analysis.append(f"Pinned pieces at: {', '.join(pins)}.") | |
| analysis.append(f"Material: White {w_mat} vs Black {b_mat}.") | |
| return " ".join(analysis) | |
| def tool_engine_analysis(board, time_limit=0.5): | |
| if board.is_game_over(): return "GAME OVER", "NONE", "NONE" | |
| if not os.path.exists(STOCKFISH_PATH): return "0", "Engine Missing", "" | |
| try: | |
| with chess.engine.SimpleEngine.popen_uci(STOCKFISH_PATH) as engine: | |
| info = engine.analyse(board, chess.engine.Limit(time=time_limit)) | |
| score = info["score"].white() | |
| if score.is_mate(): | |
| score_val = f"MATE IN {abs(score.mate())}" | |
| else: | |
| score_val = str(score.score()) | |
| best_move = info.get("pv", [None])[0] | |
| if best_move: | |
| origin = chess.square_name(best_move.from_square) | |
| dest = chess.square_name(best_move.to_square) | |
| move_uci = f"{origin} -> {dest}" | |
| else: | |
| move_uci = "NO MOVE" | |
| return score_val, move_uci, move_uci | |
| except: | |
| return "N/A", "ANALYSIS ERROR", "" | |
| def tool_ai_play(board, level): | |
| levels = { | |
| "Beginner": {"time": 0.01, "skill": 1, "depth": 1}, | |
| "Intermediate": {"time": 0.1, "skill": 8, "depth": 6}, | |
| "Advanced": {"time": 0.5, "skill": 15, "depth": 12}, | |
| "Grandmaster": {"time": 1.0, "skill": 20, "depth": 18} | |
| } | |
| config = levels.get(level, levels["Beginner"]) | |
| try: | |
| with chess.engine.SimpleEngine.popen_uci(STOCKFISH_PATH) as engine: | |
| engine.configure({"Skill Level": config["skill"]}) | |
| result = engine.play(board, chess.engine.Limit(time=config["time"], depth=config["depth"])) | |
| return result.move | |
| except: | |
| return None | |
| # --- AUDIO & TRANSCRIPTION --- | |
| def generate_voice(client, text): | |
| if not client or not text: return None | |
| try: | |
| response = client.audio.speech.create( | |
| model="tts-1", voice="onyx", input=text, speed=1.15 | |
| ) | |
| unique = f"deepblue_{uuid.uuid4()}.mp3" | |
| path = os.path.join("/tmp", unique) | |
| with open(path, "wb") as f: | |
| for chunk in response.iter_bytes(): | |
| f.write(chunk) | |
| return path | |
| except: | |
| return None | |
| def transcribe(client, audio_path): | |
| if not client or not audio_path: return None | |
| try: | |
| with open(audio_path, "rb") as f: | |
| t = client.audio.transcriptions.create(model="whisper-1", file=f, language="en") | |
| return t.text | |
| except: | |
| return None | |
| # --- AGENT BRAIN --- | |
| SYSTEM_PROMPT = """ | |
| You are DEEP BLUE AI. | |
| PROTOCOL: | |
| 1. **COMMAND**: State the optimal move coordinate (e.g., "e2 -> e4"). | |
| 2. **LOGIC**: Explain WHY using the tactical data provided (e.g., "Pins the Knight," "Avoids Mate"). | |
| TONE: Robotic, High-Tech, Concise. | |
| IF PLAYER WINS: "CHECKMATE. SYSTEM SHUTDOWN." | |
| """ | |
| def agent_reasoning(fen, user_api_key, mode="auto", user_audio=None): | |
| client = get_client(user_api_key) | |
| if not client: | |
| return "⚠️ API KEY REQUIRED. Please enter your OpenAI Key in the right column.", None, "AUTH ERROR" | |
| board = chess.Board(fen) | |
| if board.is_game_over(): | |
| if board.is_checkmate(): | |
| msg = "CHECKMATE. GAME OVER." | |
| msg += " HUMAN VICTORY." if board.turn == chess.BLACK else " SYSTEM VICTORY." | |
| return msg, generate_voice(client, msg), "END" | |
| return "DRAW.", None, "END" | |
| # TOOLS | |
| score, best_move_uci, arrow_visual = tool_engine_analysis(board) | |
| opening = "Unknown" | |
| if not OPENINGS_DB.empty: | |
| match = OPENINGS_DB[OPENINGS_DB["epd"] == board.epd()] | |
| if not match.empty: opening = match.iloc[0]['name'] | |
| tactics = analyze_tactics(board) | |
| context = f""" | |
| [SYSTEM TELEMETRY] | |
| Turn: {'White' if board.turn == chess.WHITE else 'Black'}. Score: {score}. Opening: {opening}. | |
| [TACTICS] {tactics} | |
| [OPTIMAL PATH] Move: {best_move_uci}. | |
| """ | |
| if mode == "question" and user_audio: | |
| q = transcribe(client, user_audio) | |
| context += f"\n[USER INPUT]: {q}" | |
| else: | |
| context += "\nINSTRUCTION: Output move and tactical justification." | |
| reply = "Computing..." | |
| try: | |
| response = client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[{"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": context}] | |
| ) | |
| reply = response.choices[0].message.content | |
| except Exception as e: | |
| reply = f"API Error: {e}" | |
| audio = generate_voice(client, reply) | |
| return reply, audio, arrow_visual | |
| # --- UI LOGIC --- | |
| def update_log(new_advice, history_list): | |
| if not new_advice or new_advice == "END": return history_list | |
| history_list.insert(0, f"> {new_advice}") | |
| return history_list | |
| def format_log(history_list): | |
| return "\n\n".join(history_list) | |
| def game_cycle(fen, level, user_key, history_list): | |
| board = chess.Board(fen) | |
| if board.is_game_over(): | |
| text, audio, _ = agent_reasoning(fen, user_key) | |
| return fen, text, audio, format_log(history_list), history_list | |
| if board.turn == chess.BLACK: | |
| ai_move = tool_ai_play(board, level) | |
| if ai_move: board.push(ai_move) | |
| text, audio, arrow = agent_reasoning(board.fen(), user_key, mode="auto") | |
| new_hist = update_log(f"OPT: {arrow}", history_list) | |
| return board.fen(), text, audio, format_log(new_hist), new_hist | |
| return fen, "Awaiting Input...", None, format_log(history_list), history_list | |
| def reset_game(): | |
| return chess.STARTING_FEN, "SYSTEM RESET.", None, "", [] | |
| def ask_agent(fen, user_key, audio, history_list): | |
| text, aud, arrow = agent_reasoning(fen, user_key, mode="question", user_audio=audio) | |
| return text, aud, format_log(history_list), history_list | |
| # --- UI --- | |
| css = """ | |
| body { background-color: #020617; color: #e2e8f0; } | |
| .gradio-container { background-color: #020617 !important; border: none; } | |
| #title { color: #0ea5e9; text-align: center; font-family: 'Courier New', monospace; font-size: 4em; font-weight: 800; text-shadow: 0 0 10px rgba(14, 165, 233, 0.5); } | |
| #board { border: 3px solid #0ea5e9; box-shadow: 0 0 30px rgba(14, 165, 233, 0.1); } | |
| button.primary { background-color: #0ea5e9 !important; color: black !important; font-weight: bold; border: none; } | |
| button.secondary { background-color: #1e293b !important; color: #94a3b8 !important; border: 1px solid #334155; } | |
| .feedback { background-color: #0f172a !important; color: #38bdf8 !important; border: 1px solid #1e293b; font-family: 'Consolas', 'Monaco', monospace; } | |
| """ | |
| with gr.Blocks(title="DEEP BLUE", css=css, theme=gr.themes.Base()) as demo: | |
| gr.Markdown("# Coach Deep Blue", elem_id="title") | |
| # API KEY INPUT | |
| api_key_input = gr.Textbox(label="🔑 OpenAI API Key (Optional if System Key set)", placeholder="sk-...", type="password") | |
| level = gr.Dropdown(["Beginner", "Intermediate", "Advanced", "Grandmaster"], value="Beginner", label="OPPONENT DIFFICULTY", interactive=True) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| board = Chessboard(elem_id="board", label="Battle Zone", value=chess.STARTING_FEN, game_mode=True, interactive=True) | |
| with gr.Column(scale=1): | |
| btn_reset = gr.Button("INITIALIZE NEW SEQUENCE", variant="secondary") | |
| gr.Markdown("### 📟 SYSTEM OUTPUT") | |
| coach_txt = gr.Textbox(label="Analysis", interactive=False, lines=3, elem_classes="feedback") | |
| coach_audio = gr.Audio(label="Voice", autoplay=True, interactive=False, type="filepath", visible=True) | |
| gr.Markdown("### 📜 STRATEGY LOG") | |
| history_state = gr.State([]) | |
| history_display = gr.Textbox(label="Log", interactive=False, lines=6, max_lines=10, elem_classes="feedback") | |
| gr.Markdown("### 🎤 HUMAN INTERFACE") | |
| mic = gr.Audio(sources=["microphone"], type="filepath", show_label=False) | |
| btn_ask = gr.Button("QUERY SYSTEM", variant="primary") | |
| board.move(fn=game_cycle, inputs=[board, level, api_key_input, history_state], outputs=[board, coach_txt, coach_audio, history_display, history_state]) | |
| btn_reset.click(fn=reset_game, outputs=[board, coach_txt, coach_audio, history_display, history_state]) | |
| btn_ask.click(fn=ask_agent, inputs=[board, api_key_input, mic, history_state], outputs=[coach_txt, coach_audio, history_display, history_state]) | |
| mic.stop_recording(fn=ask_agent, inputs=[board, api_key_input, mic, history_state], outputs=[coach_txt, coach_audio, history_display, history_state]) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860, ssr_mode=False, allowed_paths=["/tmp"]) |