Spaces:
Runtime error
Runtime error
| import os | |
| from fastapi import FastAPI, HTTPException, Header, UploadFile, File | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import gradio as gr | |
| from typhoon_ocr import ocr_document | |
| from pdf2image import convert_from_bytes | |
| from PIL import Image | |
| import re | |
| from dotenv import load_dotenv | |
| # --- Load environment variables from .env --- | |
| load_dotenv() | |
| # --- Config --- | |
| API_KEY = os.getenv("API_KEY") | |
| TYPHOON_API_KEY = os.getenv("TYPHOON_OCR_API_KEY") | |
| TYPHOON_BASE_URL = os.getenv("TYPHOON_BASE_URL", "https://api.opentyphoon.ai/v1") | |
| # --- FastAPI App --- | |
| app = FastAPI() | |
| # CORS (optional for public usage) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| def extract_fields_regex(text: str) -> dict: | |
| # Preprocess text | |
| text = re.sub(r"<.*?>", "", text) # Strip tags | |
| text = re.sub(r"\n+", "\n", text) # Collapse newlines | |
| text = re.sub(r"\s{2,}", " ", text) # Collapse multiple spaces | |
| text = re.sub(r"\t+", " ", text) | |
| patterns = { | |
| "tax_id": r"(?:TAX\s*ID|เลขที่ผู้เสียภาษี)[\s:\-\.]*([\d]{10,13})", | |
| "tax_invoice": r"(?:TAX\s*INV\.?|เลขที่ใบกำกับภาษี|ใบกำกับ)[\s:\-\.]*([\dA-Z\-\/]{6,20})", | |
| "tax_date": r"(?:DATE|วันที่|ออกใบกำกับวันที่)?[\s:\-\.]*([\d]{2,4}/[\d]{1,2}/[\d]{1,2})", | |
| "amount": r"(?:AMOUNT\s*THB|จำนวนเงิน|รวมเงิน)[\s:\-\.]*([\d,]+\.\d{2})", | |
| "baht_per_litre": r"(?:Baht\/Litr\.?|Bath\/Ltr\.?|ราคาต่อลิตร|ราคา\/ลิตร|ราคาน้ำมัน|บาทต่อลิตร)[\s:\-\.]*([\d,]+\.\d{2})", | |
| "litre": r"(?:Ltr\.?|Ltrs?\.?|ลิตร)[\s:\-\.]*([\d,]+\.\d{2,3})", | |
| "vat": r"(?:VAT|ภาษีมูลค่าเพิ่ม)[\s:\-\.]*([\d,]+\.\d{2})", | |
| "total": r"(?:TOTAL\s*THB|ยอดรวม|รวมทั้งสิ้น|รวมเงินทั้งสิ้น|ยอดเงินสุทธิ)[\s:\-\.]*([\d,]+\.\d{2})", | |
| } | |
| results = {} | |
| for field, pattern in patterns.items(): | |
| match = re.search(pattern, text, re.IGNORECASE) | |
| results[field] = match.group(1).strip() if match else None | |
| # Optional fallback if regex fails | |
| # if not results["เลขที่ใบกำกับภาษี"]: | |
| # match = re.search(r"TAX\s*INV\.?\s*</td>\s*<td>\s*([\d\-]+)", text, re.IGNORECASE) | |
| # if match: | |
| # results["เลขที่ใบกำกับภาษี"] = match.group(1).strip() | |
| return results | |
| def pdf_to_image(file_bytes: bytes) -> Image.Image: | |
| images = convert_from_bytes(file_bytes) | |
| return images[0] # First page only | |
| # --- API Endpoint --- | |
| async def ocr_receipt( | |
| file: UploadFile = File(...), | |
| x_api_key: str | None = Header(None), | |
| ): | |
| if API_KEY and x_api_key != API_KEY: | |
| raise HTTPException(status_code=401, detail="Invalid API key") | |
| content = await file.read() | |
| try: | |
| # Handle PDF and image | |
| if file.filename.lower().endswith(".pdf"): | |
| image = pdf_to_image(content) | |
| raw_output = ocr_document(image, task_type="structure") | |
| else: | |
| raw_output = ocr_document(content, task_type="structure") | |
| text = raw_output if isinstance(raw_output, str) else raw_output.get("text", "") | |
| extracted = extract_fields_regex(text) | |
| return { | |
| "raw_ocr": text, | |
| "extracted_fields": extracted, | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # --- Gradio UI --- | |
| def gradio_interface(image_path: str | Image.Image): | |
| if isinstance(image_path, str) and image_path.lower().endswith(".pdf"): | |
| with open(image_path, "rb") as f: | |
| image = pdf_to_image(f.read()) | |
| else: | |
| image = image_path | |
| raw = ocr_document(image, task_type="structure") | |
| text = raw if isinstance(raw, str) else raw.get("text", "") | |
| extracted = extract_fields_regex(text) | |
| return text, extracted | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# 🧾 แปลงและตรวจสอบใบเสร็จ") | |
| with gr.Row(): | |
| img = gr.Image(type="filepath", label="อัปโหลดไฟล์ PDF หรือรูปภาพ") | |
| out_text = gr.Textbox(label="ข้อความทั้งหมด", lines=10) | |
| out_fields = gr.JSON(label="ข้อความที่ดึงออกมา") | |
| btn = gr.Button("ประมวลผลใบเสร็จ") | |
| btn.click(fn=gradio_interface, inputs=img, outputs=[out_text, out_fields]) | |
| # --- Mount Gradio on FastAPI --- | |
| # app = gr.mount_gradio_app(app, demo, path="/ui") | |
| demo.launch(share=False) | |