# app.py from flask import Flask, render_template, send_from_directory, request from flask_socketio import SocketIO, emit from collections import defaultdict import os app = Flask(__name__) app.config['SECRET_KEY'] = 'your-secret-key' socketio = SocketIO(app, cors_allowed_origins="*") # polling OK with Werkzeug # Read password from Hugging Face Space secret (prefer 'password', fallback to 'PASSWORD') SPACE_PASSWORD = os.environ.get('password') or os.environ.get('PASSWORD') or None # -------------------- # Game state # -------------------- class GoGame: def __init__(self, size=13): # (unchanged) self.size = size self.board = [[None for _ in range(size)] for _ in range(size)] self.current_player = 'black' self.captured = {'black': 0, 'white': 0} # stones captured OF that color self.passes = 0 self.game_over = False self.scores = {'black': 0, 'white': 0} self.game_history = {'black': 0, 'white': 0} # per-color tally (kept for reference) self.last_move = None def place_stone(self, x, y, color): if self.game_over: return False if not (0 <= x < self.size and 0 <= y < self.size): return False if self.board[x][y] is not None: return False self.board[x][y] = color self.last_move = (x, y) opponent = 'white' if color == 'black' else 'black' removed = self.check_captures(x, y, opponent) self.captured[opponent] += len(removed) # suicide check (simplified) if not self.has_liberties(x, y, color): self.board[x][y] = None return False self.current_player = opponent self.passes = 0 return True def check_captures(self, x, y, opponent): captured = [] for dx, dy in [(0,1), (1,0), (0,-1), (-1,0)]: nx, ny = x + dx, y + dy if 0 <= nx < self.size and 0 <= ny < self.size: if self.board[nx][ny] == opponent and not self.has_liberties(nx, ny, opponent): captured.extend(self.remove_group(nx, ny, opponent)) return captured def has_liberties(self, x, y, color): visited = set() return self._has_liberties_recursive(x, y, color, visited) def _has_liberties_recursive(self, x, y, color, visited): if (x, y) in visited: return False visited.add((x, y)) for dx, dy in [(0,1), (1,0), (0,-1), (-1,0)]: nx, ny = x + dx, y + dy if 0 <= nx < self.size and 0 <= ny < self.size: if self.board[nx][ny] is None: return True if self.board[nx][ny] == color and self._has_liberties_recursive(nx, ny, color, visited): return True return False def remove_group(self, x, y, color): visited = set() self._remove_group_recursive(x, y, color, visited) return list(visited) def _remove_group_recursive(self, x, y, color, visited): if (x, y) in visited: return if not (0 <= x < self.size and 0 <= y < self.size): return if self.board[x][y] != color: return visited.add((x, y)) self.board[x][y] = None for dx, dy in [(0,1), (1,0), (0,-1), (-1,0)]: self._remove_group_recursive(x + dx, y + dy, color, visited) def pass_turn(self): if self.game_over: return self.passes += 1 self.current_player = 'white' if self.current_player == 'black' else 'black' if self.passes >= 2: self.end_game() def resign(self, player_color): winner = 'white' if player_color == 'black' else 'black' self.game_history[winner] += 1 # per-color history (kept) self.game_over = True return winner def end_game(self): # simple scoring: stones on board + captured; komi for white black_score = self.captured['black'] white_score = self.captured['white'] + 6.5 for row in self.board: for cell in row: if cell == 'black': black_score += 1 elif cell == 'white': white_score += 1 self.scores['black'] = int(black_score) self.scores['white'] = int(white_score) if black_score > white_score: self.game_history['black'] += 1 else: self.game_history['white'] += 1 self.game_over = True def reset(self, keep_history=True): """Reset board/state; preserve per-color history by default.""" hist = self.game_history if keep_history else {'black': 0, 'white': 0} self.board = [[None for _ in range(self.size)] for _ in range(self.size)] self.current_player = 'black' self.captured = {'black': 0, 'white': 0} self.passes = 0 self.game_over = False self.scores = {'black': 0, 'white': 0} self.last_move = None self.game_history = hist # -------------------- # Globals # -------------------- game = GoGame(size=13) sid_to_username = {} # sid -> username current_player_user = {'black': None, 'white': None} # color -> username wins_by_user = defaultdict(int) # username -> wins # -------------------- # Helpers # -------------------- def winner_username_for_color(color): return current_player_user.get(color) def snapshot(): return { 'board_size': game.size, 'current_player': game.current_player, 'scores': game.scores, 'game_history': game.game_history, # per-color (legacy) 'wins_by_user': dict(wins_by_user), # username-based (authoritative for UI) 'board': game.board, 'last_move': game.last_move, 'captured': game.captured, 'passes': game.passes, 'game_over': game.game_over } def broadcast_colors(): socketio.emit('colors', { 'black': current_player_user['black'], 'white': current_player_user['white'] }) def require_auth(): """Ensure the current socket is authenticated via successful join.""" if request.sid not in sid_to_username: emit('error', {'message': 'Not authenticated'}) return False return True # -------------------- # HTTP # -------------------- @app.route('/') def index(): return render_template('index.html') @app.route('/') def static_files(path): return send_from_directory('.', path) # -------------------- # Socket events # -------------------- @socketio.on('connect') def on_connect(): print('Client connected', request.sid) @socketio.on('disconnect') def on_disconnect(): sid = request.sid username = sid_to_username.pop(sid, None) print('Client disconnected', sid, username) # Do not clear color mapping on disconnect; persists until new_game. @socketio.on('join') def on_join(data): username = (data or {}).get('username') provided_password = (data or {}).get('password') # Enforce password if configured if SPACE_PASSWORD and provided_password != SPACE_PASSWORD: emit('error', {'message': 'Invalid password'}) return if not username: emit('error', {'message': 'Username required'}) return sid_to_username[request.sid] = username emit('init', snapshot()) broadcast_colors() @socketio.on('claim_color') def on_claim_color(data): if not require_auth(): return username = (data or {}).get('username') color = (data or {}).get('color') if color not in ('black', 'white'): emit('error', {'message': 'Bad color'}) return other = 'white' if color == 'black' else 'black' # user cannot hold both colors if current_player_user.get(other) == username: emit('error', {'message': 'You already claimed the other color'}) return # idempotent if current_player_user.get(color) == username: broadcast_colors() return # claim if free if current_player_user.get(color) is None: current_player_user[color] = username broadcast_colors() else: emit('error', {'message': f'{color} already taken'}) @socketio.on('move') def on_move(data): if not require_auth(): return x = int((data or {}).get('x', -1)) y = int((data or {}).get('y', -1)) user = (data or {}).get('player') moving_color = game.current_player if user != current_player_user.get(moving_color): emit('error', {'message': 'Not your turn!'}) return if game.place_stone(x, y, moving_color): socketio.emit('move', { 'x': x, 'y': y, 'player': moving_color, 'next_player': game.current_player, 'captured': game.captured }) else: emit('error', {'message': 'Invalid move!'}) @socketio.on('pass') def on_pass(data): if not require_auth(): return user = (data or {}).get('player') if user != current_player_user.get(game.current_player): emit('error', {'message': 'Not your turn!'}) return game.pass_turn() socketio.emit('pass', {'next_player': game.current_player}) if game.game_over: # award win to username (if not a draw) if game.scores['black'] != game.scores['white']: winner_color = 'black' if game.scores['black'] > game.scores['white'] else 'white' wuser = winner_username_for_color(winner_color) if wuser: wins_by_user[wuser] += 1 socketio.emit('game_over', { 'scores': game.scores, 'game_history': game.game_history, 'wins_by_user': dict(wins_by_user) }) @socketio.on('resign') def on_resign(data): if not require_auth(): return user = (data or {}).get('player') # find resigning player's color player_color = None for color, uname in current_player_user.items(): if uname == user: player_color = color break if not player_color: emit('error', {'message': 'You have not claimed a color'}) return winner_color = game.resign(player_color) wuser = winner_username_for_color(winner_color) if wuser: wins_by_user[wuser] += 1 socketio.emit('resign', { 'winner': winner_color, 'scores': game.scores, 'game_history': game.game_history, 'wins_by_user': dict(wins_by_user) }) @socketio.on('new_game') def on_new_game(data): if not require_auth(): return # reset board; keep username win history and per-color history game.reset(keep_history=True) current_player_user['black'] = None current_player_user['white'] = None socketio.emit('init', snapshot()) broadcast_colors() # -------------------- # Run # -------------------- if __name__ == '__main__': socketio.run(app, host='0.0.0.0', port=int(os.environ.get('PORT', 7860)), allow_unsafe_werkzeug=True)