File size: 12,686 Bytes
54450d9
 
 
 
ecde422
c53b4d4
 
283b3ae
c53b4d4
 
283b3ae
 
7e73466
ecde422
283b3ae
 
 
 
1d3dd21
 
283b3ae
 
 
 
 
 
 
 
 
 
54450d9
45fefd3
73f8435
283b3ae
 
 
 
 
 
 
45fefd3
 
 
283b3ae
45fefd3
 
283b3ae
45fefd3
 
 
283b3ae
45fefd3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
283b3ae
45fefd3
 
 
283b3ae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73f8435
283b3ae
 
73f8435
283b3ae
 
73f8435
283b3ae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
45fefd3
 
283b3ae
45fefd3
283b3ae
45fefd3
283b3ae
 
 
45fefd3
283b3ae
 
 
 
 
 
 
 
 
 
 
45fefd3
 
 
 
283b3ae
45fefd3
 
 
 
283b3ae
 
 
 
 
73f8435
283b3ae
45fefd3
283b3ae
 
 
 
 
 
7e73466
7014246
 
73f8435
283b3ae
 
73f8435
283b3ae
 
 
 
 
7014246
45fefd3
 
7014246
283b3ae
 
 
 
 
 
 
 
 
7014246
283b3ae
 
 
 
180daf4
54450d9
 
180daf4
283b3ae
 
 
 
 
 
 
 
 
 
54450d9
 
 
283b3ae
 
7014246
 
 
 
 
 
 
 
 
54450d9
7014246
 
 
 
 
 
 
54450d9
283b3ae
7014246
283b3ae
 
 
 
180daf4
283b3ae
 
180daf4
73f8435
283b3ae
 
 
 
 
 
 
 
73f8435
283b3ae
 
 
 
 
73f8435
283b3ae
7014246
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
283b3ae
 
 
7538541
283b3ae
 
 
 
7538541
283b3ae
 
 
 
 
 
 
 
ecde422
 
283b3ae
20b7c40
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
# Wild Fire Tracker - Fire Detection MCP Server
# Copyright (c) 2024 Wild Fire Tracker
# Licensed under MIT License - see LICENSE file for details

import gradio as gr
import cv2
import numpy as np
import threading
import time
from datetime import datetime
from PIL import Image
from transformers import BlipProcessor, BlipForQuestionAnswering
import torch

# Load BLIP-2 model
print("Loading BLIP-2 model...")
vqa_processor = BlipProcessor.from_pretrained("Salesforce/blip-vqa-base")
vqa_model = BlipForQuestionAnswering.from_pretrained("Salesforce/blip-vqa-base", torch_dtype=torch.float16)
device = "cuda" if torch.cuda.is_available() else "cpu"
vqa_model = vqa_model.to(device)
print(f"Model loaded on {device}")

class FireDetectionMCP:
    def __init__(self):
        self.running = False
        self.current_frame = None
        self.status = "No video source"
        self.status_color = "#808080"  # Gray
        self.last_analysis_time = 0
        self.frame_count = 0
        self.last_detection_time = None
        self.display_status = "No video source"  # For video overlay (no emojis)
        
    def analyze_frame(self, frame):
        """Analyze frame for fire/smoke"""
        try:
            # Convert to PIL Image
            image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
            image = image.resize((224, 224))
            
            # Ask multiple questions for better detection
            fire_question = "Is there fire or flames in this image?"
            smoke_question = "Is there smoke in this image?"
            
            # Check for fire with confidence
            fire_inputs = vqa_processor(image, fire_question, return_tensors="pt").to(device)
            with torch.no_grad():
                fire_outputs = vqa_model.generate(**fire_inputs, max_length=10, return_dict_in_generate=True, output_scores=True)
            fire_answer = vqa_processor.decode(fire_outputs.sequences[0], skip_special_tokens=True).lower()
            fire_confidence = torch.softmax(fire_outputs.scores[0][0], dim=0).max().item() * 100
            
            # Check for smoke with confidence
            smoke_inputs = vqa_processor(image, smoke_question, return_tensors="pt").to(device)
            with torch.no_grad():
                smoke_outputs = vqa_model.generate(**smoke_inputs, max_length=10, return_dict_in_generate=True, output_scores=True)
            smoke_answer = vqa_processor.decode(smoke_outputs.sequences[0], skip_special_tokens=True).lower()
            smoke_confidence = torch.softmax(smoke_outputs.scores[0][0], dim=0).max().item() * 100
            
            # Determine result
            has_fire = 'yes' in fire_answer or 'fire' in fire_answer or 'flame' in fire_answer
            has_smoke = 'yes' in smoke_answer or 'smoke' in smoke_answer
            
            if has_fire and has_smoke:
                status_with_emoji = f"πŸ”₯πŸ’¨ FIRE & SMOKE DETECTED (F:{fire_confidence:.0f}% S:{smoke_confidence:.0f}%)"
                status_no_emoji = f"FIRE & SMOKE DETECTED (F:{fire_confidence:.0f}% S:{smoke_confidence:.0f}%)"
                return status_with_emoji, status_no_emoji, "#FF0000"  # Red
            elif has_fire:
                status_with_emoji = f"πŸ”₯ FIRE DETECTED ({fire_confidence:.0f}%)"
                status_no_emoji = f"FIRE DETECTED ({fire_confidence:.0f}%)"
                return status_with_emoji, status_no_emoji, "#FF4500"  # Orange
            elif has_smoke:
                status_with_emoji = f"πŸ’¨ SMOKE DETECTED ({smoke_confidence:.0f}%)"
                status_no_emoji = f"SMOKE DETECTED ({smoke_confidence:.0f}%)"
                return status_with_emoji, status_no_emoji, "#696969"  # Gray
            else:
                status_with_emoji = f"βœ… ALL CLEAR (F:{fire_confidence:.0f}% S:{smoke_confidence:.0f}%)"
                status_no_emoji = f"ALL CLEAR (F:{fire_confidence:.0f}% S:{smoke_confidence:.0f}%)"
                return status_with_emoji, status_no_emoji, "#32CD32"  # Green
                
        except Exception as e:
            return f"❌ ERROR: {str(e)}", "#FF0000"  # Red
    
    def monitor_video(self, video_source):
        """Monitor video source"""
        if video_source.isdigit():
            cap = cv2.VideoCapture(int(video_source))
        else:
            cap = cv2.VideoCapture(video_source)
            
        if not cap.isOpened():
            self.status = "❌ Cannot open video source"
            self.status_color = "#FF0000"
            return
        
        # Check if MP4 for looping
        is_mp4 = isinstance(video_source, str) and video_source.lower().endswith('.mp4')
        
        self.running = True
        self.frame_count = 0
        
        while self.running:
            ret, frame = cap.read()
            
            # Loop MP4 files
            if not ret and is_mp4:
                cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
                ret, frame = cap.read()
                self.frame_count = 0
            
            if not ret:
                break
                
            self.frame_count += 1
            current_time = time.time()
            
            # Resize for display
            display_frame = cv2.resize(frame, (640, 480))
            
            # Analyze every 10 seconds (only if still running)
            if self.running and current_time - self.last_analysis_time >= 10.0:
                print(f"[{datetime.now().strftime('%H:%M:%S')}] Analyzing frame {self.frame_count}...")
                self.status, self.display_status, self.status_color = self.analyze_frame(frame)
                self.last_analysis_time = current_time
                self.last_detection_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S %Z')
                print(f"[{datetime.now().strftime('%H:%M:%S')}] Result: {self.status}")
            
            # Add status overlay
            cv2.rectangle(display_frame, (0, 0), (640, 80), (0, 0, 0), -1)
            
            # Convert hex color to BGR
            if self.status_color == "#32CD32":  # Green
                color = (50, 205, 50)
            elif self.status_color == "#FF4500":  # Orange
                color = (0, 69, 255)
            elif self.status_color == "#696969":  # Gray
                color = (105, 105, 105)
            else:  # Red
                color = (0, 0, 255)
            
            # cv2.putText(display_frame, self.status, (10, 30), 
            #            cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
            cv2.putText(display_frame, self.display_status, (10, 60), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
            
            # Add full timestamp
            timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S %Z')
            cv2.putText(display_frame, f"Time: {timestamp}", (10, 460), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
            
            # Store frame
            self.current_frame = cv2.cvtColor(display_frame, cv2.COLOR_BGR2RGB)
            
            time.sleep(0.04)  # ~25 FPS
        
        cap.release()
        self.status = "Monitoring stopped"
        self.status_color = "#808080"
    
    def start_monitoring(self, video_source):
        """Start monitoring in thread"""
        if self.running:
            return "Already monitoring"
        
        if not video_source or (isinstance(video_source, str) and not video_source.strip()):
            return "Please provide a video source"
        
        thread = threading.Thread(target=self.monitor_video, args=(video_source,), daemon=True)
        thread.start()
        
        return f"βœ… Started monitoring: {video_source}"
    
    def stop_monitoring(self):
        """Stop monitoring"""
        self.running = False
        self.current_frame = None
        self.status = "πŸ›‘ Monitoring stopped"
        self.display_status = "Monitoring stopped"
        self.status_color = "#808080"
        return "πŸ›‘ Monitoring stopped"
    
    def get_frame(self):
        """Get current frame"""
        if self.current_frame is not None:
            return self.current_frame
        else:
            # Placeholder
            placeholder = np.zeros((480, 640, 3), dtype=np.uint8)
            cv2.putText(placeholder, "Waiting for video stream...", (150, 240), 
                       cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2)
            return placeholder
    
    def get_status(self):
        """Get current status"""
        if self.last_detection_time:
            return f"{self.status} (Last check: {self.last_detection_time})"
        return self.status

# Initialize MCP server
mcp_server = FireDetectionMCP()

def create_interface():
    """Create Gradio interface"""
    
    with gr.Blocks(title="πŸ”₯ Fire Detection MCP Server", theme=gr.themes.Soft()) as interface:
        gr.Markdown("# πŸ”₯ Fire Detection MCP Server")
        gr.Markdown("Real-time fire and smoke detection from video streams (analyzes every 10 seconds)")
        gr.Markdown("⚠️ **Usage**: Upload your own video file or use live sources (webcam/RTSP). It may take few seconds to load stream and show analysis. Webcam may not work on HF Spaces.")
        gr.Markdown("πŸ”— **Sample Videos**: [Fire Test Video](https://www.pexels.com/video/a-man-carrying-gear-walking-away-from-a-controlled-fire-8552246/) | [Smoke Test Video](https://www.pexels.com/video/aerial-view-of-controlled-forest-fire-in-spring-31361444/)")

        with gr.Row():
            with gr.Column(scale=1):
                gr.Markdown("### Video Source Options")
                
                with gr.Tabs():
                    with gr.Tab("πŸ“ Upload Video"):
                        video_upload = gr.File(
                            label="Upload MP4 Video",
                            file_types=[".mp4", ".avi", ".mov"],
                            type="filepath"
                        )
                        upload_btn = gr.Button("πŸš€ Start Monitoring", variant="primary")
                    
                    with gr.Tab("πŸ“Ή Live Sources"):
                        video_input = gr.Textbox(
                            label="Video Source",
                            placeholder="0 (webcam), rtsp://url, or path/to/video.mp4",
                            value="0"
                        )
                        live_btn = gr.Button("πŸš€ Start Monitoring", variant="primary")
                
                stop_btn = gr.Button("πŸ›‘ Stop Monitoring", variant="secondary")
                
                control_output = gr.Textbox(label="Control Status", interactive=False)
                
                gr.Markdown("### Detection Status")
                status_display = gr.Textbox(label="Current Status", interactive=False)
                
                gr.Markdown("### Status Legend")
                gr.Markdown("🟒 Clear | 🟠 Fire | ⚫ Smoke | πŸ”΄ Error")
            
            with gr.Column(scale=2):
                gr.Markdown("### Live Video Stream")
                video_display = gr.Image(
                    label="Video Feed",
                    height=480,
                    width=640,
                    interactive=False
                )
        
        # Update functions
        def update_display():
            frame = mcp_server.get_frame()
            status = mcp_server.get_status()
            return frame, status
        
        # Event handlers
        def start_from_upload(video_file):
            mcp_server.stop_monitoring()  # Stop current stream
            if video_file is None:
                return "❌ Please upload a video file first"
            return mcp_server.start_monitoring(video_file)
        
        def start_live_source(video_source):
            mcp_server.stop_monitoring()  # Stop current stream
            return mcp_server.start_monitoring(video_source)
        
        upload_btn.click(
            fn=start_from_upload,
            inputs=video_upload,
            outputs=control_output
        )
        
        live_btn.click(
            fn=start_live_source,
            inputs=video_input,
            outputs=control_output
        )
        
        stop_btn.click(
            fn=mcp_server.stop_monitoring,
            outputs=control_output
        )
        
        # Auto-refresh every 0.5 seconds
        timer = gr.Timer(0.5)
        timer.tick(
            fn=update_display,
            outputs=[video_display, status_display]
        )
    
    return interface

if __name__ == "__main__":
    interface = create_interface()
    interface.launch(mcp_server=True, server_port=7860, share=False)