Spaces:
Sleeping
Sleeping
| """ | |
| 🤗 SkladBot Free AI Microservice | |
| Hugging Face Spaces микросервис для БЕСПЛАТНОЙ обработки складских документов | |
| Возможности: | |
| - TrOCR для печатного и рукописного текста | |
| - LayoutLM для понимания структуры документов | |
| - Table Transformer для обработки таблиц | |
| - Gradio API для REST запросов | |
| - 100% БЕСПЛАТНО - 20k запросов/месяц | |
| """ | |
| import gradio as gr | |
| import torch | |
| import numpy as np | |
| from PIL import Image | |
| import io | |
| import base64 | |
| import json | |
| import re | |
| from datetime import datetime | |
| from typing import Dict, List, Any, Optional | |
| # Transformers models | |
| from transformers import ( | |
| TrOCRProcessor, VisionEncoderDecoderModel, | |
| pipeline, | |
| AutoTokenizer, AutoModelForTokenClassification | |
| ) | |
| # Импортируем наш кастомный токенайзер | |
| from custom_tokenizers import Byt5LangTokenizer | |
| # Регистрируем кастомный токенайзер в transformers | |
| from transformers import AutoConfig, AutoTokenizer | |
| from transformers.models.auto.tokenization_auto import TOKENIZER_MAPPING | |
| from transformers.tokenization_utils_base import TOKENIZER_CONFIG_FILE | |
| # Регистрация кастомного токенайзера | |
| if 'Byt5LangTokenizer' not in dir(): | |
| try: | |
| # Добавляем токенайзер в систему автоматического обнаружения transformers | |
| from transformers.models.auto.tokenization_auto import TOKENIZER_MAPPING, TOKENIZER_MAPPING_NAMES | |
| from transformers.tokenization_utils_base import TOKENIZER_CONFIG_FILE | |
| print("🔄 Регистрируем кастомный токенайзер Byt5LangTokenizer...") | |
| except ImportError as e: | |
| print(f"⚠️ Предупреждение при импорте модулей трансформеров: {e}") | |
| class FreeAIOrchestrator: | |
| """Координатор БЕСПЛАТНЫХ AI сервисов для складских документов""" | |
| def __init__(self): | |
| print("🚀 Инициализация SkladBot Free AI...") | |
| # TrOCR для печатного текста (БЕСПЛАТНО) | |
| self.printed_processor = TrOCRProcessor.from_pretrained("microsoft/trocr-base-printed") | |
| self.printed_model = VisionEncoderDecoderModel.from_pretrained("microsoft/trocr-base-printed") | |
| # TrOCR для рукописного текста (БЕСПЛАТНО) | |
| self.handwritten_processor = TrOCRProcessor.from_pretrained("microsoft/trocr-base-handwritten") | |
| self.handwritten_model = VisionEncoderDecoderModel.from_pretrained("microsoft/trocr-base-handwritten") | |
| # LayoutLM для понимания документов (БЕСПЛАТНО) | |
| self.document_qa = pipeline( | |
| "document-question-answering", | |
| model="impira/layoutlm-document-qa" | |
| ) | |
| # Table Transformer для таблиц (БЕСПЛАТНО) | |
| self.table_detector = pipeline( | |
| "object-detection", | |
| model="microsoft/table-transformer-structure-recognition" | |
| ) | |
| # NEW: Добавляем интеграцию с Surya Table (БЕСПЛАТНО) | |
| try: | |
| # Регистрируем кастомный токенайзер перед загрузкой модели | |
| print("🔄 Инициализация кастомного токенайзера для Surya Table...") | |
| # Используем пайплайн с указанием нашего токенайзера | |
| self.surya_table_model = TableRecPredictor() | |
| print("✅ Surya Table модель загружена успешно") | |
| self.surya_table_available = True | |
| except Exception as e: | |
| print(f"⚠️ Не удалось загрузить Surya Table: {e}") | |
| self.surya_table_available = False | |
| self.stats = { | |
| "total_requests": 0, | |
| "successful_extractions": 0, | |
| "avg_confidence": 0.0, | |
| "start_time": datetime.now() | |
| } | |
| print("✅ SkladBot Free AI готов к работе!") | |
| async def extract_warehouse_data(self, image, document_type="auto"): | |
| """Главная функция - извлечение данных из складских документов""" | |
| self.stats["total_requests"] += 1 | |
| try: | |
| # Конвертация изображения | |
| if isinstance(image, str): | |
| # Base64 строка | |
| image_data = base64.b64decode(image) | |
| image = Image.open(io.BytesIO(image_data)) | |
| # 1. Определение типа документа | |
| doc_type = await self.classify_document_type(image) | |
| if document_type != "auto": | |
| doc_type = document_type | |
| # 2. Выбор стратегии обработки | |
| extraction_results = [] | |
| # TrOCR для печатного текста | |
| printed_text = await self.extract_printed_text(image) | |
| extraction_results.append({ | |
| "method": "trocr_printed", | |
| "text": printed_text, | |
| "confidence": 0.85 | |
| }) | |
| # TrOCR для рукописного текста (если нужно) | |
| if doc_type in ["handwritten", "mixed"]: | |
| handwritten_text = await self.extract_handwritten_text(image) | |
| extraction_results.append({ | |
| "method": "trocr_handwritten", | |
| "text": handwritten_text, | |
| "confidence": 0.80 | |
| }) | |
| # LayoutLM для структурированного понимания | |
| if doc_type in ["invoice", "table", "form"]: | |
| structured_data = await self.extract_structured_data(image, doc_type) | |
| extraction_results.append({ | |
| "method": "layoutlm", | |
| "data": structured_data, | |
| "confidence": 0.90 | |
| }) | |
| # Table Transformer для таблиц | |
| if doc_type == "table": | |
| table_data = await self.extract_table_data(image) | |
| extraction_results.append({ | |
| "method": "table_transformer", | |
| "data": table_data, | |
| "confidence": 0.88 | |
| }) | |
| # 3. Объединение и обработка результатов | |
| final_result = await self.merge_extraction_results(extraction_results, doc_type) | |
| # 4. Парсинг складских команд | |
| warehouse_commands = await self.parse_warehouse_commands(final_result) | |
| # 5. Генерация предложений | |
| suggestions = await self.generate_smart_suggestions(warehouse_commands) | |
| self.stats["successful_extractions"] += 1 | |
| return { | |
| "success": True, | |
| "document_type": doc_type, | |
| "extracted_items": warehouse_commands, | |
| "suggestions": suggestions, | |
| "raw_extractions": extraction_results, | |
| "confidence": self.calculate_overall_confidence(extraction_results), | |
| "processing_time": f"{datetime.now().timestamp():.2f}s", | |
| "cost": 0.0 # ВСЕГДА БЕСПЛАТНО! | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "document_type": "unknown", | |
| "extracted_items": [], | |
| "suggestions": [], | |
| "confidence": 0.0, | |
| "cost": 0.0 | |
| } | |
| async def extract_printed_text(self, image): | |
| """Извлечение печатного текста через TrOCR""" | |
| try: | |
| pixel_values = self.printed_processor(image, return_tensors="pt").pixel_values | |
| generated_ids = self.printed_model.generate(pixel_values) | |
| generated_text = self.printed_processor.batch_decode(generated_ids, skip_special_tokens=True)[0] | |
| return generated_text | |
| except Exception as e: | |
| print(f"❌ Ошибка TrOCR печатный: {e}") | |
| return "" | |
| async def extract_handwritten_text(self, image): | |
| """Извлечение рукописного текста через TrOCR""" | |
| try: | |
| pixel_values = self.handwritten_processor(image, return_tensors="pt").pixel_values | |
| generated_ids = self.handwritten_model.generate(pixel_values) | |
| generated_text = self.handwritten_processor.batch_decode(generated_ids, skip_special_tokens=True)[0] | |
| return generated_text | |
| except Exception as e: | |
| print(f"❌ Ошибка TrOCR рукописный: {e}") | |
| return "" | |
| async def extract_structured_data(self, image, doc_type): | |
| """Структурированное понимание документа через LayoutLM""" | |
| try: | |
| # Определяем вопросы на основе типа документа | |
| questions = self.get_document_questions(doc_type) | |
| results = {} | |
| for question in questions: | |
| try: | |
| result = self.document_qa(image=image, question=question) | |
| results[question] = result["answer"] | |
| except: | |
| results[question] = "" | |
| return results | |
| except Exception as e: | |
| print(f"❌ Ошибка LayoutLM: {e}") | |
| return {} | |
| async def extract_table_data(self, image): | |
| """Извлечение табличных данных через специализированные модели""" | |
| try: | |
| # Проверка наличия модели Surya Table | |
| if hasattr(self, 'surya_table_available') and self.surya_table_available: | |
| try: | |
| # Попытка использования Surya Table для структурированного распознавания таблиц | |
| print("🔍 Используем Surya Table для структурированного распознавания таблицы...") | |
| # Преобразуем PIL Image в формат, необходимый для модели | |
| if isinstance(image, str): | |
| # Если передан путь или base64 | |
| if image.startswith('data:image'): | |
| # Обработка base64 | |
| image_data = base64.b64decode(image.split(',')[1]) | |
| pil_image = Image.open(io.BytesIO(image_data)) | |
| else: | |
| # Обработка пути к файлу | |
| pil_image = Image.open(image) | |
| elif isinstance(image, Image.Image): | |
| pil_image = image | |
| else: | |
| # Если передан bytes | |
| pil_image = Image.open(io.BytesIO(image)) | |
| # Распознаем таблицу через Surya Table | |
| table_result = self.surya_table_model([pil_image]) | |
| # Преобразуем результат в структурированный формат | |
| try: | |
| # Результат может быть в разных форматах | |
| if isinstance(table_result, list) and len(table_result) > 0: | |
| if isinstance(table_result[0], dict) and 'cells' in table_result[0]: | |
| table_data = self._parse_surya_table(table_result[0]) | |
| else: | |
| table_data = [] | |
| else: | |
| table_data = [] | |
| return { | |
| "success": True, | |
| "type": "table", | |
| "model": "surya_table", | |
| "rows": table_data, | |
| "raw_text": "", | |
| "confidence": 0.95 | |
| } | |
| except Exception as parse_error: | |
| print(f"⚠️ Ошибка парсинга результата Surya Table: {parse_error}") | |
| # Продолжаем с запасным вариантом | |
| except Exception as surya_error: | |
| print(f"⚠️ Ошибка Surya Table, используем запасной вариант: {surya_error}") | |
| # Запасной вариант: Table Transformer для определения местоположения таблиц | |
| print("🔍 Используем Table Transformer для определения местоположения таблиц...") | |
| table_detection = self.table_detector(image) | |
| # Если обнаружены таблицы, используем TrOCR для извлечения текста из них | |
| table_data = [] | |
| for detection in table_detection: | |
| if detection["label"] == "table" and detection["score"] > 0.7: | |
| # Вырезаем область таблицы | |
| table_data.append({ | |
| "box": detection["box"], | |
| "score": detection["score"], | |
| "type": "table" | |
| }) | |
| # Извлекаем текст из таблиц через TrOCR | |
| if table_data: | |
| for table in table_data: | |
| # Здесь можно добавить код для вырезания области таблицы и применения TrOCR | |
| pass | |
| return table_data | |
| except Exception as e: | |
| print(f"❌ Ошибка распознавания таблицы: {e}") | |
| return [] | |
| def _parse_surya_table(self, surya_result): | |
| """Парсинг результата Surya в структурированные данные""" | |
| rows = [] | |
| headers = {} | |
| # First pass: get headers | |
| for cell in surya_result.get('cells', []): | |
| if cell.get('is_header'): | |
| headers[cell.get('col_id')] = cell.get('text', '').lower() | |
| # Second pass: get rows | |
| row_dict = {} | |
| for cell in surya_result.get('cells', []): | |
| row_id = cell.get('row_id') | |
| if row_id not in row_dict: | |
| row_dict[row_id] = {} | |
| col_id = cell.get('col_id') | |
| header = headers.get(col_id, str(col_id)) | |
| value = cell.get('text', '') | |
| # Преобразуем заголовки к стандартным полям | |
| if 'товар' in header or 'название' in header or 'наимен' in header: | |
| row_dict[row_id]['name'] = value | |
| elif 'кол' in header or 'шт' in header: | |
| try: | |
| # Извлекаем числовое значение | |
| quantity = re.search(r'(\d+(?:\.\d+)?)', value) | |
| if quantity: | |
| row_dict[row_id]['quantity'] = float(quantity.group(1)) | |
| else: | |
| row_dict[row_id]['quantity'] = value | |
| except: | |
| row_dict[row_id]['quantity'] = value | |
| elif 'арт' in header: | |
| row_dict[row_id]['article'] = value | |
| elif 'цен' in header: | |
| # Извлекаем числовое значение цены | |
| price = re.search(r'(\d+(?:\.\d+)?)', value) | |
| if price: | |
| row_dict[row_id]['price'] = float(price.group(1)) | |
| else: | |
| row_dict[row_id]['price'] = value | |
| else: | |
| # Для прочих колонок используем оригинальное название | |
| row_dict[row_id][header] = value | |
| return list(row_dict.values()) | |
| def _parse_table_text(self, table_text): | |
| """Парсинг текста таблицы в структурированные данные""" | |
| rows = [] | |
| try: | |
| # Разбиваем на строки | |
| lines = table_text.strip().split('\n') | |
| # Определяем заголовки (первая строка) | |
| if lines: | |
| headers = self._extract_columns(lines[0]) | |
| # Обрабатываем строки данных | |
| for i in range(1, len(lines)): | |
| row_data = {} | |
| columns = self._extract_columns(lines[i]) | |
| # Сопоставляем значения с заголовками | |
| for j, value in enumerate(columns): | |
| if j < len(headers): | |
| header = headers[j].lower() | |
| # Преобразуем заголовки к стандартным полям | |
| if 'товар' in header or 'название' in header or 'наимен' in header: | |
| row_data['name'] = value | |
| elif 'кол' in header or 'шт' in header: | |
| try: | |
| # Извлекаем числовое значение | |
| quantity = re.search(r'(\d+(?:\.\d+)?)', value) | |
| if quantity: | |
| row_data['quantity'] = float(quantity.group(1)) | |
| else: | |
| row_data['quantity'] = value | |
| except: | |
| row_data['quantity'] = value | |
| elif 'арт' in header: | |
| row_data['article'] = value | |
| elif 'цен' in header: | |
| # Извлекаем числовое значение цены | |
| price = re.search(r'(\d+(?:\.\d+)?)', value) | |
| if price: | |
| row_data['price'] = float(price.group(1)) | |
| else: | |
| row_data['price'] = value | |
| else: | |
| # Для прочих колонок используем оригинальное название | |
| row_data[header] = value | |
| if row_data: | |
| rows.append(row_data) | |
| except Exception as e: | |
| print(f"⚠️ Ошибка парсинга таблицы: {e}") | |
| return rows | |
| def _extract_columns(self, line): | |
| """Извлечение колонок из строки таблицы""" | |
| # Простое разделение по табуляции или нескольким пробелам | |
| return re.split(r'\t| +', line.strip()) | |
| async def merge_extraction_results(self, extraction_results, doc_type): | |
| """Объединение результатов разных AI методов""" | |
| merged_text = "" | |
| structured_data = {} | |
| for result in extraction_results: | |
| if "text" in result: | |
| merged_text += f"{result['text']} " | |
| if "data" in result and isinstance(result["data"], dict): | |
| structured_data.update(result["data"]) | |
| return { | |
| "combined_text": merged_text.strip(), | |
| "structured_data": structured_data, | |
| "document_type": doc_type | |
| } | |
| async def parse_warehouse_commands(self, extraction_result): | |
| """Парсинг складских команд из извлеченного текста""" | |
| text = extraction_result.get("combined_text", "") | |
| # Используем NER для извлечения сущностей | |
| try: | |
| entities = self.ner_pipeline(text) | |
| except: | |
| entities = [] | |
| # Регулярные выражения для складских данных | |
| warehouse_items = [] | |
| # Поиск артикулов (F186, ST9, F186ST9) | |
| article_pattern = r'\b(?:F\d+(?:ST\d+)?|ST\d+)\b' | |
| articles = re.findall(article_pattern, text, re.IGNORECASE) | |
| # Поиск количеств | |
| quantity_pattern = r'\b(\d+)\s*(?:шт|лист|листов|кг|м2|м²)\b' | |
| quantities = re.findall(quantity_pattern, text, re.IGNORECASE) | |
| # Поиск цен | |
| price_pattern = r'\b(\d+(?:\.\d+)?)\s*(?:руб|₽|р)\b' | |
| prices = re.findall(price_pattern, text, re.IGNORECASE) | |
| # Объединение найденных данных | |
| max_items = max(len(articles), len(quantities), 1) | |
| for i in range(max_items): | |
| item = { | |
| "article": articles[i] if i < len(articles) else "", | |
| "quantity": int(quantities[i]) if i < len(quantities) else 0, | |
| "price": float(prices[i]) if i < len(prices) else 0.0, | |
| "name": self.extract_product_name(text, i), | |
| "confidence": 0.8 | |
| } | |
| if item["article"] or item["quantity"] > 0: | |
| warehouse_items.append(item) | |
| return warehouse_items | |
| def extract_product_name(self, text, index=0): | |
| """Извлечение названия товара из текста""" | |
| # Простая эвристика для извлечения названий | |
| words = text.split() | |
| # Ищем слова после артикулов или количеств | |
| product_keywords = ["лдсп", "мдф", "фанера", "дуб", "бук", "ясень", "орех", "чикаго"] | |
| for word in words: | |
| if any(keyword in word.lower() for keyword in product_keywords): | |
| return word.title() | |
| return "Товар" | |
| async def generate_smart_suggestions(self, warehouse_items): | |
| """Генерация умных предложений""" | |
| suggestions = [] | |
| for item in warehouse_items: | |
| if not item["article"]: | |
| suggestions.append({ | |
| "type": "missing_article", | |
| "message": f"Не найден артикул для товара '{item['name']}'", | |
| "action": "manual_input", | |
| "priority": "high" | |
| }) | |
| if item["quantity"] == 0: | |
| suggestions.append({ | |
| "type": "missing_quantity", | |
| "message": f"Не найдено количество для '{item['article'] or item['name']}'", | |
| "action": "manual_input", | |
| "priority": "medium" | |
| }) | |
| if item["price"] == 0: | |
| suggestions.append({ | |
| "type": "missing_price", | |
| "message": f"Не найдена цена для '{item['article'] or item['name']}'", | |
| "action": "suggest_price", | |
| "priority": "low" | |
| }) | |
| return suggestions | |
| def calculate_overall_confidence(self, extraction_results): | |
| """Расчет общей уверенности""" | |
| if not extraction_results: | |
| return 0.0 | |
| total_confidence = sum(result.get("confidence", 0) for result in extraction_results) | |
| return round(total_confidence / len(extraction_results), 2) | |
| def get_stats(self): | |
| """Статистика работы микросервиса""" | |
| return { | |
| "quota_used": f"{self.stats['total_requests']}/20000", | |
| "uptime_hours": (datetime.now() - self.stats["start_time"]).total_seconds() / 3600, | |
| "models_loaded": ["TrOCR", "LayoutLM", "TableTransformer", "RuBERT-NER", "SuryaTable"], | |
| "success_rate": self._calculate_success_rate() | |
| } | |
| def _calculate_success_rate(self): | |
| """Расчет успешного процента""" | |
| if self.stats["total_requests"] == 0: | |
| return 0.0 | |
| return round(self.stats["successful_extractions"] / self.stats["total_requests"] * 100, 1) | |
| # Инициализация AI | |
| ai_orchestrator = FreeAIOrchestrator() | |
| # Gradio интерфейс | |
| def process_warehouse_document(image, document_type): | |
| """Обработка складского документа через Gradio""" | |
| try: | |
| import asyncio | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| result = loop.run_until_complete( | |
| ai_orchestrator.extract_warehouse_data(image, document_type) | |
| ) | |
| return json.dumps(result, ensure_ascii=False, indent=2) | |
| except Exception as e: | |
| return json.dumps({ | |
| "success": False, | |
| "error": f"Ошибка обработки: {str(e)}", | |
| "cost": 0.0 | |
| }, ensure_ascii=False, indent=2) | |
| def get_service_stats(): | |
| """Получение статистики сервиса""" | |
| stats = ai_orchestrator.get_stats() | |
| return json.dumps(stats, ensure_ascii=False, indent=2) | |
| # Gradio интерфейс | |
| with gr.Blocks(title="SkladBot Free AI") as app: | |
| gr.Markdown("# 🤖 SkladBot Free AI Microservice") | |
| gr.Markdown("**БЕСПЛАТНАЯ** обработка складских документов через AI") | |
| with gr.Tab("Обработка документов"): | |
| image_input = gr.Image(type="pil", label="Загрузите изображение документа") | |
| doc_type = gr.Dropdown( | |
| choices=["auto", "invoice", "table", "form", "handwritten"], | |
| value="auto", | |
| label="Тип документа" | |
| ) | |
| process_btn = gr.Button("🔍 Обработать документ", variant="primary") | |
| result_output = gr.Textbox( | |
| label="Результат обработки", | |
| lines=20, | |
| max_lines=30 | |
| ) | |
| process_btn.click( | |
| process_warehouse_document, | |
| inputs=[image_input, doc_type], | |
| outputs=result_output | |
| ) | |
| with gr.Tab("Статистика"): | |
| stats_btn = gr.Button("📊 Обновить статистику") | |
| stats_output = gr.Textbox( | |
| label="Статистика сервиса", | |
| lines=10 | |
| ) | |
| stats_btn.click( | |
| get_service_stats, | |
| outputs=stats_output | |
| ) | |
| gr.Markdown("---") | |
| gr.Markdown("💰 **Стоимость**: $0 (100% бесплатно)") | |
| gr.Markdown("📊 **Лимит**: 20,000 запросов/месяц") | |
| gr.Markdown("🧠 **AI модели**: TrOCR, LayoutLM, Table Transformer, RuBERT, SuryaTable") | |
| if __name__ == "__main__": | |
| app.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=True, | |
| show_error=True | |
| ) | |