File size: 10,419 Bytes
549e88f
dcacf8c
 
6c81ee2
 
64e02f6
05e856e
dcacf8c
0165fec
dcacf8c
216d49a
6c81ee2
 
216d49a
 
 
 
 
590e3aa
6c81ee2
590e3aa
 
 
 
 
 
 
 
53f46f8
590e3aa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
216d49a
 
 
590e3aa
6c81ee2
3aec18a
 
 
19145af
53f46f8
 
 
 
19145af
1a8dcf4
3aec18a
53f46f8
e0909e8
 
53f46f8
ef96755
0165fec
3aec18a
590e3aa
ef96755
590e3aa
e0909e8
3aec18a
0165fec
3aec18a
15c5c47
3aec18a
15c5c47
3aec18a
 
 
 
15c5c47
3aec18a
6c81ee2
 
15c5c47
 
 
 
53f46f8
6c81ee2
216d49a
 
 
dcacf8c
216d49a
 
afe4a85
d218eb0
e0909e8
b3f4332
 
 
19145af
590e3aa
19145af
 
216d49a
 
ef96755
 
216d49a
ef96755
 
 
 
216d49a
0165fec
590e3aa
3aec18a
590e3aa
216d49a
 
590e3aa
0165fec
05e856e
216d49a
 
 
 
 
19145af
 
0165fec
 
3aec18a
 
216d49a
 
0165fec
216d49a
3aec18a
590e3aa
 
 
 
 
 
e0909e8
590e3aa
216d49a
 
 
e0909e8
 
 
216d49a
590e3aa
e0909e8
590e3aa
e0909e8
3aec18a
216d49a
 
 
 
 
 
 
 
ef96755
216d49a
3aec18a
19145af
590e3aa
3aec18a
 
 
0165fec
 
3aec18a
0165fec
 
216d49a
0165fec
d218eb0
 
216d49a
3aec18a
ef96755
e0909e8
3aec18a
e0909e8
1a8dcf4
216d49a
3aec18a
 
1a8dcf4
3aec18a
e0909e8
ef96755
590e3aa
ef96755
216d49a
 
3aec18a
0165fec
216d49a
0165fec
3aec18a
 
216d49a
3aec18a
 
0165fec
216d49a
0165fec
dcacf8c
3aec18a
4ca406d
ab27405
 
 
 
 
0165fec
6c81ee2
3aec18a
216d49a
6c81ee2
3aec18a
0165fec
3aec18a
 
216d49a
0165fec
3aec18a
0165fec
216d49a
ef96755
3aec18a
ef96755
3aec18a
ef96755
216d49a
b3f4332
216d49a
 
dcacf8c
 
05e856e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
import gradio as gr
from gradio_chessboard import Chessboard
import chess
import chess.engine
import pandas as pd
import os
import uuid
from openai import OpenAI
import time

SYSTEM_KEY = os.getenv("OPENAI_API_KEY")
STOCKFISH_PATH = "/usr/games/stockfish"

def get_client(user_key):
    key = user_key.strip() if user_key else SYSTEM_KEY
    if not key: return None
    return OpenAI(api_key=key)

def _load_lichess_openings(path_prefix="/app/data/lichess_openings/dist/"):
    try:
        files = [f"{path_prefix}{vol}.tsv" for vol in ("a", "b", "c", "d", "e")]
        dfs = []
        for fn in files:
            if os.path.exists(fn):
                df = pd.read_csv(fn, sep="\t", usecols=["eco", "name", "pgn", "uci", "epd"])
                dfs.append(df)
        if not dfs: return pd.DataFrame()
        return pd.concat(dfs, ignore_index=True)
    except:
        return pd.DataFrame()

OPENINGS_DB = _load_lichess_openings()

def analyze_tactics(board):
    fen = board.fen()
    pins = []
    turn = board.turn
    for sq in chess.SQUARES:
        piece = board.piece_at(sq)
        if piece and piece.color == turn:
            if board.is_pinned(turn, sq):
                pins.append(chess.square_name(sq))
    
    check = board.is_check()
    values = {chess.PAWN: 1, chess.KNIGHT: 3, chess.BISHOP: 3, chess.ROOK: 5, chess.QUEEN: 9}
    w_mat = sum(len(board.pieces(pt, chess.WHITE)) * val for pt, val in values.items())
    b_mat = sum(len(board.pieces(pt, chess.BLACK)) * val for pt, val in values.items())
    
    analysis = []
    if check: analysis.append("KING IN CHECK.")
    if pins: analysis.append(f"Pinned pieces at: {', '.join(pins)}.")
    analysis.append(f"Material: White {w_mat} vs Black {b_mat}.")
    return " ".join(analysis)

def tool_engine_analysis(board, time_limit=0.5):
    if board.is_game_over(): return "GAME OVER", "NONE", "NONE"
    if not os.path.exists(STOCKFISH_PATH): return "0", "Engine Missing", ""
    
    try:
        with chess.engine.SimpleEngine.popen_uci(STOCKFISH_PATH) as engine:
            info = engine.analyse(board, chess.engine.Limit(time=time_limit))
            score = info["score"].white()
            
            if score.is_mate():
                score_val = f"MATE IN {abs(score.mate())}"
            else:
                score_val = str(score.score())

            best_move = info.get("pv", [None])[0]
            if best_move:
                origin = chess.square_name(best_move.from_square)
                dest = chess.square_name(best_move.to_square)
                move_uci = f"{origin} -> {dest}"
            else:
                move_uci = "NO MOVE"

            return score_val, move_uci, move_uci
    except:
        return "N/A", "ANALYSIS ERROR", ""

def tool_ai_play(board, level):
    levels = {
        "Beginner": {"time": 0.01, "skill": 1, "depth": 1},
        "Intermediate": {"time": 0.1, "skill": 8, "depth": 6},
        "Advanced": {"time": 0.5, "skill": 15, "depth": 12},
        "Grandmaster": {"time": 1.0, "skill": 20, "depth": 18}
    }
    config = levels.get(level, levels["Beginner"])
    try:
        with chess.engine.SimpleEngine.popen_uci(STOCKFISH_PATH) as engine:
            engine.configure({"Skill Level": config["skill"]})
            result = engine.play(board, chess.engine.Limit(time=config["time"], depth=config["depth"]))
            return result.move
    except:
        return None

# --- AUDIO & TRANSCRIPTION ---
def generate_voice(client, text):
    if not client or not text: return None
    try:
        response = client.audio.speech.create(
            model="tts-1", voice="onyx", input=text, speed=1.15
        )
        unique = f"deepblue_{uuid.uuid4()}.mp3"
        path = os.path.join("/tmp", unique)
        with open(path, "wb") as f:
            for chunk in response.iter_bytes():
                f.write(chunk)
        return path
    except:
        return None

def transcribe(client, audio_path):
    if not client or not audio_path: return None
    try:
        with open(audio_path, "rb") as f:
            t = client.audio.transcriptions.create(model="whisper-1", file=f, language="en")
        return t.text
    except:
        return None

# --- AGENT BRAIN ---
SYSTEM_PROMPT = """
You are DEEP BLUE AI.
PROTOCOL:
1. **COMMAND**: State the optimal move coordinate (e.g., "e2 -> e4").
2. **LOGIC**: Explain WHY using the tactical data provided (e.g., "Pins the Knight," "Avoids Mate").
TONE: Robotic, High-Tech, Concise.
IF PLAYER WINS: "CHECKMATE. SYSTEM SHUTDOWN."
"""

def agent_reasoning(fen, user_api_key, mode="auto", user_audio=None):
    client = get_client(user_api_key)
    if not client:
        return "⚠️ API KEY REQUIRED. Please enter your OpenAI Key in the right column.", None, "AUTH ERROR"

    board = chess.Board(fen)
    
    if board.is_game_over():
        if board.is_checkmate():
            msg = "CHECKMATE. GAME OVER."
            msg += " HUMAN VICTORY." if board.turn == chess.BLACK else " SYSTEM VICTORY."
            return msg, generate_voice(client, msg), "END"
        return "DRAW.", None, "END"

    # TOOLS
    score, best_move_uci, arrow_visual = tool_engine_analysis(board)
    opening = "Unknown"
    if not OPENINGS_DB.empty:
        match = OPENINGS_DB[OPENINGS_DB["epd"] == board.epd()]
        if not match.empty: opening = match.iloc[0]['name']
    tactics = analyze_tactics(board)

    context = f"""
    [SYSTEM TELEMETRY]
    Turn: {'White' if board.turn == chess.WHITE else 'Black'}. Score: {score}. Opening: {opening}.
    [TACTICS] {tactics}
    [OPTIMAL PATH] Move: {best_move_uci}.
    """
    
    if mode == "question" and user_audio:
        q = transcribe(client, user_audio)
        context += f"\n[USER INPUT]: {q}"
    else:
        context += "\nINSTRUCTION: Output move and tactical justification."

    reply = "Computing..."
    try:
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": context}]
        )
        reply = response.choices[0].message.content
    except Exception as e:
        reply = f"API Error: {e}"

    audio = generate_voice(client, reply)
    return reply, audio, arrow_visual

# --- UI LOGIC ---
def update_log(new_advice, history_list):
    if not new_advice or new_advice == "END": return history_list
    history_list.insert(0, f"> {new_advice}")
    return history_list

def format_log(history_list):
    return "\n\n".join(history_list)

def game_cycle(fen, level, user_key, history_list):
    board = chess.Board(fen)
    
    if board.is_game_over():
        text, audio, _ = agent_reasoning(fen, user_key)
        return fen, text, audio, format_log(history_list), history_list

    if board.turn == chess.BLACK:
        ai_move = tool_ai_play(board, level)
        if ai_move: board.push(ai_move)
        
        text, audio, arrow = agent_reasoning(board.fen(), user_key, mode="auto")
        new_hist = update_log(f"OPT: {arrow}", history_list)
        return board.fen(), text, audio, format_log(new_hist), new_hist

    return fen, "Awaiting Input...", None, format_log(history_list), history_list

def reset_game():
    return chess.STARTING_FEN, "SYSTEM RESET.", None, "", []

def ask_agent(fen, user_key, audio, history_list):
    text, aud, arrow = agent_reasoning(fen, user_key, mode="question", user_audio=audio)
    return text, aud, format_log(history_list), history_list

# --- UI ---
css = """
body { background-color: #020617; color: #e2e8f0; }
.gradio-container { background-color: #020617 !important; border: none; }
#title { color: #0ea5e9; text-align: center; font-family: 'Courier New', monospace; font-size: 4em; font-weight: 800; text-shadow: 0 0 10px rgba(14, 165, 233, 0.5); }
#board { border: 3px solid #0ea5e9; box-shadow: 0 0 30px rgba(14, 165, 233, 0.1); }
button.primary { background-color: #0ea5e9 !important; color: black !important; font-weight: bold; border: none; }
button.secondary { background-color: #1e293b !important; color: #94a3b8 !important; border: 1px solid #334155; }
.feedback { background-color: #0f172a !important; color: #38bdf8 !important; border: 1px solid #1e293b; font-family: 'Consolas', 'Monaco', monospace; }
"""

with gr.Blocks(title="DEEP BLUE", css=css, theme=gr.themes.Base()) as demo:
    gr.Markdown("# Coach Deep Blue", elem_id="title")

    # API KEY INPUT
    api_key_input = gr.Textbox(label="🔑 OpenAI API Key (Optional if System Key set)", placeholder="sk-...", type="password")
    level = gr.Dropdown(["Beginner", "Intermediate", "Advanced", "Grandmaster"], value="Beginner", label="OPPONENT DIFFICULTY", interactive=True)
            
    with gr.Row():
        with gr.Column(scale=2):
            board = Chessboard(elem_id="board", label="Battle Zone", value=chess.STARTING_FEN, game_mode=True, interactive=True)
        
        with gr.Column(scale=1):
            btn_reset = gr.Button("INITIALIZE NEW SEQUENCE", variant="secondary")
            
            gr.Markdown("### 📟 SYSTEM OUTPUT")
            coach_txt = gr.Textbox(label="Analysis", interactive=False, lines=3, elem_classes="feedback")
            coach_audio = gr.Audio(label="Voice", autoplay=True, interactive=False, type="filepath", visible=True)
            
            gr.Markdown("### 📜 STRATEGY LOG")
            history_state = gr.State([])
            history_display = gr.Textbox(label="Log", interactive=False, lines=6, max_lines=10, elem_classes="feedback")
            
            gr.Markdown("### 🎤 HUMAN INTERFACE")
            mic = gr.Audio(sources=["microphone"], type="filepath", show_label=False)
            btn_ask = gr.Button("QUERY SYSTEM", variant="primary")

    board.move(fn=game_cycle, inputs=[board, level, api_key_input, history_state], outputs=[board, coach_txt, coach_audio, history_display, history_state])
    btn_reset.click(fn=reset_game, outputs=[board, coach_txt, coach_audio, history_display, history_state])
    btn_ask.click(fn=ask_agent, inputs=[board, api_key_input, mic, history_state], outputs=[coach_txt, coach_audio, history_display, history_state])
    mic.stop_recording(fn=ask_agent, inputs=[board, api_key_input, mic, history_state], outputs=[coach_txt, coach_audio, history_display, history_state])

if __name__ == "__main__":
    demo.launch(server_name="0.0.0.0", server_port=7860, ssr_mode=False, allowed_paths=["/tmp"])