Spaces:
Sleeping
Sleeping
| from collections import Counter | |
| import math | |
| import numpy as np | |
| from PIL import Image | |
| from io import BytesIO | |
| from ultralytics import YOLO | |
| def _compute_iou(box_a, box_b): | |
| ax1, ay1, ax2, ay2 = box_a | |
| bx1, by1, bx2, by2 = box_b | |
| inter_x1 = max(ax1, bx1) | |
| inter_y1 = max(ay1, by1) | |
| inter_x2 = min(ax2, bx2) | |
| inter_y2 = min(ay2, by2) | |
| inter_w = max(0.0, inter_x2 - inter_x1) | |
| inter_h = max(0.0, inter_y2 - inter_y1) | |
| inter_area = inter_w * inter_h | |
| area_a = max(0.0, (ax2 - ax1)) * max(0.0, (ay2 - ay1)) | |
| area_b = max(0.0, (bx2 - bx1)) * max(0.0, (by2 - by1)) | |
| union = area_a + area_b - inter_area | |
| return inter_area / union if union > 0 else 0.0 | |
| def _nms_xyxy(boxes, scores, iou_threshold=0.5): | |
| if len(boxes) == 0: | |
| return [] | |
| idxs = np.argsort(scores)[::-1] | |
| keep = [] | |
| while len(idxs) > 0: | |
| i = idxs[0] | |
| keep.append(i) | |
| if len(idxs) == 1: | |
| break | |
| rest = idxs[1:] | |
| ious = np.array([_compute_iou(boxes[i], boxes[j]) for j in rest]) | |
| idxs = rest[ious <= iou_threshold] | |
| return keep | |
| class DengueDetector: | |
| def __init__(self, model_path="./models/detect.pt"): | |
| self.model = YOLO(model_path) | |
| self.names = self.model.names | |
| self.tile_size = 1024 | |
| self.default_overlap = 0.2 | |
| self.fast_max_side = 3072 | |
| self.batch_tiles = 8 | |
| try: | |
| if hasattr(self.model, "fuse"): | |
| self.model.fuse() | |
| except Exception: | |
| pass | |
| print("Modelo carregado com as seguintes classes:", self.names) | |
| def calculate_intensity(self, objects): | |
| if not objects: | |
| return 0.0 | |
| weights = { | |
| "piscina_suja": 10.0, | |
| "reservatorio_de_agua": 8.0, | |
| "pneu": 6.0, | |
| "lona": 4.0, | |
| "monte_de_lixo": 3.0, | |
| "saco_de_lixo": 2.0, | |
| "piscina_limpa": 1.0 | |
| } | |
| total_score = 0.0 | |
| first_obj = objects[0] | |
| img_w = first_obj["box"]["original_width"] | |
| img_h = first_obj["box"]["original_height"] | |
| total_img_area = float(img_w * img_h) | |
| if total_img_area == 0: | |
| for obj in objects: | |
| weight = weights.get(obj["class"], 1.0) | |
| confidence = obj["confidence"] | |
| total_score += weight * confidence | |
| return total_score | |
| for obj in objects: | |
| weight = weights.get(obj["class"], 1.0) | |
| confidence = obj["confidence"] | |
| box = obj["box"] | |
| w = box["x2"] - box["x1"] | |
| h = box["y2"] - box["y1"] | |
| obj_area = w * h | |
| relative_area = obj_area / total_img_area | |
| # risco = Peso * Confiança * Área Relativa | |
| risk_contribution = weight * confidence * relative_area | |
| total_score += risk_contribution | |
| return total_score * 100.0 | |
| def detect_image(self, image_bytes, fast: bool = True): | |
| img = Image.open(BytesIO(image_bytes)).convert("RGB") | |
| orig_width, orig_height = img.size | |
| scale = 1.0 | |
| tile_size = self.tile_size | |
| overlap = self.default_overlap | |
| if fast: | |
| max_side = max(orig_width, orig_height) | |
| if max_side > self.fast_max_side: | |
| scale = self.fast_max_side / float(max_side) | |
| new_w = max(1, int(round(orig_width * scale))) | |
| new_h = max(1, int(round(orig_height * scale))) | |
| img_resized = img.resize((new_w, new_h), resample=Image.BILINEAR) | |
| else: | |
| img_resized = img | |
| else: | |
| img_resized = img | |
| img_np = np.array(img_resized) | |
| height, width = img_np.shape[:2] | |
| stride = max(1, int(tile_size * (1 - overlap))) | |
| def compute_starts(total, size, stride): | |
| starts = list(range(0, max(total - size, 0) + 1, stride)) | |
| if len(starts) == 0: | |
| starts = [0] | |
| last = max(total - size, 0) | |
| if starts[-1] != last: | |
| starts.append(last) | |
| return starts | |
| x_starts = compute_starts(width, tile_size, stride) | |
| y_starts = compute_starts(height, tile_size, stride) | |
| tiles = [] | |
| origins = [] | |
| for y0 in y_starts: | |
| for x0 in x_starts: | |
| x1 = x0 | |
| y1 = y0 | |
| x2 = min(x0 + tile_size, width) | |
| y2 = min(y0 + tile_size, height) | |
| tile = img_np[y1:y2, x1:x2, :] | |
| if tile.size == 0: | |
| continue | |
| tiles.append(tile) | |
| origins.append((x1, y1)) | |
| all_boxes = [] | |
| all_scores = [] | |
| all_classes = [] | |
| if len(tiles) > 0: | |
| bs = max(1, int(self.batch_tiles)) | |
| for i in range(0, len(tiles), bs): | |
| batch = tiles[i:i+bs] | |
| batch_origins = origins[i:i+bs] | |
| results = self.model(batch, verbose=False) | |
| for res, (ox, oy) in zip(results, batch_origins): | |
| boxes = res.boxes | |
| if boxes is None or len(boxes) == 0: | |
| continue | |
| class_ids = boxes.cls.tolist() | |
| confidences = boxes.conf.tolist() | |
| xyxy = boxes.xyxy.cpu().numpy() if hasattr(boxes.xyxy, 'cpu') else np.array(boxes.xyxy) | |
| for j in range(len(class_ids)): | |
| bx1, by1, bx2, by2 = map(float, xyxy[j]) | |
| all_boxes.append((bx1 + ox, by1 + oy, bx2 + ox, by2 + oy)) | |
| all_scores.append(float(confidences[j])) | |
| all_classes.append(int(class_ids[j])) | |
| final_boxes = [] | |
| final_scores = [] | |
| final_classes = [] | |
| all_boxes_np = np.array(all_boxes, dtype=float) | |
| all_scores_np = np.array(all_scores, dtype=float) | |
| all_classes_np = np.array(all_classes, dtype=int) | |
| for cls in set(all_classes_np.tolist()) if len(all_classes_np) else []: | |
| cls_mask = (all_classes_np == cls) | |
| boxes_cls = all_boxes_np[cls_mask] | |
| scores_cls = all_scores_np[cls_mask] | |
| keep = _nms_xyxy(boxes_cls, scores_cls, iou_threshold=0.5) | |
| for k in keep: | |
| final_boxes.append(tuple(boxes_cls[k])) | |
| final_scores.append(float(scores_cls[k])) | |
| final_classes.append(int(cls)) | |
| detections = [] | |
| class_names = [] | |
| for b, s, c in zip(final_boxes, final_scores, final_classes): | |
| x1, y1, x2, y2 = map(float, b) | |
| if scale != 1.0: | |
| inv = 1.0 / scale | |
| x1 *= inv | |
| y1 *= inv | |
| x2 *= inv | |
| y2 *= inv | |
| cname = self.names[int(c)] | |
| if cname == "lona" and s < 0.6: | |
| continue | |
| class_names.append(cname) | |
| detections.append({ | |
| "class": cname, | |
| "confidence": round(s, 4), | |
| "box": { | |
| "x1": x1, "y1": y1, "x2": x2, "y2": y2, | |
| "original_width": orig_width, "original_height": orig_height | |
| } | |
| }) | |
| counts = Counter(class_names) | |
| intensity_score = self.calculate_intensity(detections) | |
| return { | |
| "total": len(detections), | |
| "contagem": counts, | |
| "objetos": detections, | |
| "intensity_score": intensity_score | |
| } |