import os import shutil from tempfile import TemporaryDirectory, NamedTemporaryFile from typing import List, Union, Optional, Tuple, Dict, Any, Generator from pathlib import Path import torch import gradio as gr from huggingface_hub import ( CommitOperationAdd, HfApi, ModelCard, Discussion, CommitInfo, create_repo, RepoUrl, ) from huggingface_hub.file_download import repo_folder_name from optimum.exporters.tasks import TasksManager from optimum.exporters.neuron.model_configs import * from optimum.neuron import ( NeuronModelForFeatureExtraction, NeuronModelForSentenceTransformers, NeuronModelForMaskedLM, NeuronModelForQuestionAnswering, NeuronModelForSequenceClassification, NeuronModelForTokenClassification, NeuronModelForMultipleChoice, NeuronModelForImageClassification, NeuronModelForSemanticSegmentation, NeuronModelForObjectDetection, NeuronModelForAudioClassification, NeuronModelForAudioFrameClassification, NeuronModelForCTC, NeuronModelForXVector, NeuronModelForCausalLM, NeuronModelForSeq2SeqLM, ) from optimum.neuron import ( NeuronDiffusionPipelineBase, NeuronStableDiffusionPipeline, NeuronStableDiffusionImg2ImgPipeline, NeuronStableDiffusionInpaintPipeline, NeuronStableDiffusionInstructPix2PixPipeline, NeuronLatentConsistencyModelPipeline, NeuronStableDiffusionXLPipeline, NeuronStableDiffusionXLImg2ImgPipeline, NeuronStableDiffusionXLInpaintPipeline, NeuronStableDiffusionControlNetPipeline, NeuronStableDiffusionXLControlNetPipeline, NeuronPixArtAlphaPipeline, NeuronPixArtSigmaPipeline, NeuronFluxPipeline ) from optimum.neuron.cache import synchronize_hub_cache from synchronizer import synchronize_hub_cache_with_pr from optimum.exporters.neuron import main_export, build_stable_diffusion_components_mandatory_shapes SPACES_URL = "https://huggingface.co/spaces/optimum/neuron-export" CUSTOM_CACHE_REPO = os.getenv("CUSTOM_CACHE_REPO") HF_TOKEN = os.environ.get("HF_TOKEN") # Task to NeuronModel mapping for transformers TASK_TO_MODEL_CLASS = { "feature-extraction": NeuronModelForFeatureExtraction, "sentence-transformers": NeuronModelForSentenceTransformers, "fill-mask": NeuronModelForMaskedLM, "question-answering": NeuronModelForQuestionAnswering, "text-classification": NeuronModelForSequenceClassification, "token-classification": NeuronModelForTokenClassification, "multiple-choice": NeuronModelForMultipleChoice, "image-classification": NeuronModelForImageClassification, "semantic-segmentation": NeuronModelForSemanticSegmentation, "object-detection": NeuronModelForObjectDetection, "audio-classification": NeuronModelForAudioClassification, "audio-frame-classification": NeuronModelForAudioFrameClassification, "automatic-speech-recognition": NeuronModelForCTC, "audio-xvector": NeuronModelForXVector, "text-generation": NeuronModelForCausalLM, "text2text-generation": NeuronModelForSeq2SeqLM, } # Diffusion pipeline mapping DIFFUSION_PIPELINE_MAPPING = { "text-to-image": StableDiffusionPipeline, "image-to-image": StableDiffusionImg2ImgPipeline, "inpaint": StableDiffusionInpaintPipeline, "instruct-pix2pix": StableDiffusionInstructPix2PixPipeline, "latent-consistency": LatentConsistencyModelPipeline, "stable-diffusion": StableDiffusionPipeline, "stable-diffusion-xl": StableDiffusionXLPipeline, "stable-diffusion-xl-img2img": StableDiffusionXLImg2ImgPipeline, "stable-diffusion-xl-inpaint": StableDiffusionXLInpaintPipeline, "controlnet": StableDiffusionControlNetPipeline, "controlnet-xl": StableDiffusionXLControlNetPipeline, "pixart-alpha": PixArtAlphaPipeline, "pixart-sigma": PixArtSigmaPipeline, "flux": FluxPipeline, } ENCODER_TASKS = {"feature-extraction","sentence-transformers","fill-mask","question-answering","text-classification","token-classification","multiple-choice","image-classification","semantic-segmentation","object-detection","audio-classification","audio-frame-classification","automatic-speech-recognition","audio-xvector"} DECODER_TASKS = {"text-generation"} SEQ2SEQ_TAKS = {"text2text-generation"} def get_default_inputs(task_or_pipeline: str) -> Dict[str, int]: """Get default input shapes based on task type or diffusion pipeline type.""" if task_or_pipeline in ["feature-extraction", "sentence-transformers", "fill-mask", "question-answering", "text-classification", "token-classification","text-generation"]: return {"batch_size": 1, "sequence_length": 128} elif task_or_pipeline == "multiple-choice": return {"batch_size": 1, "num_choices": 4, "sequence_length": 128} elif task_or_pipeline == "text2text-generation": return {"batch_size": 1, "sequence_length": 128, "num_beams":4} elif task_or_pipeline in ["image-classification", "semantic-segmentation", "object-detection"]: return {"batch_size": 1, "num_channels": 3, "height": 224, "width": 224} elif task_or_pipeline in ["audio-classification", "audio-frame-classification", "audio-xvector"]: return {"batch_size": 1, "audio_sequence_length": 16000} elif task_or_pipeline in DIFFUSION_PIPELINE_MAPPING: return {"batch_size": 1, "height": 1024, "width": 1024, "num_images_per_prompt": 1, "torch_dtype":torch.bfloat16} else: # Default to text-based shapes return {"batch_size": 1, "sequence_length": 128} def previous_pr(api: "HfApi", model_id: str, pr_title: str) -> Optional["Discussion"]: try: discussions = api.get_repo_discussions(repo_id=model_id) except Exception: return None for discussion in discussions: if ( discussion.status == "open" and discussion.is_pull_request and discussion.title == pr_title ): return discussion return None def export(model_id: str, task_or_pipeline:str, model_type: str, folder: str): yield f"📦 Exporting model `{model_id}` for task `{task_or_pipeline}`..." if model_type == "diffusers": model_class = DIFFUSION_PIPELINE_MAPPING.get(task_or_pipeline) inputs = get_default_inputs(task_or_pipeline) yield f"🔧 Using default inputs: {inputs}" if task_or_pipeline in ENCODER_TASKS or SEQ2SEQ_TAKS: result = main_export( model_name_or_path=model_id, output=folder, token=HF_TOKEN, task=task_or_pipeline, cpu_backend=True, do_validation=False, **inputs, ) if task_or_pipeline in DECODER_TASKS: neuron_config = NeuronModelForCausalLM.get_neuron_config(model_name_or_path=model_id, **inputs) neuron_model = NeuronModelForCausalLM.export( model_id=export_decoder_id, neuron_config=neuron_config, token = HF_TOKEN, ) model.save_pretrained(folder) if task_or_pipeline in DIFFUSION_PIPELINE_MAPPING: model = model_class.from_pretrained(model_id) input_shapes = build_stable_diffusion_components_mandatory_shapes(**inputs) compiler_kwargs = {"auto_cast": "matmul", "auto_cast_type": "bf16"} result = main_export( model_name_or_path=model_id, output=folder, compiler_kwargs=compiler_kwargs, token=HF_TOKEN, library_name=model_type, cpu_backend=True, model=model, **input_shapes, ) def export_and_git_add(model_id: str, task_or_pipeline: str, model_type: str, folder: str, token: str) -> Any: try: export(model_id, task_or_pipeline, model_type, folder) yield "✅ Export completed successfully." except Exception as e: yield f"❌ Export failed with error: {e}" raise operations = [] for root, _, files in os.walk(folder): for filename in files: file_path = os.path.join(root, filename) repo_path = os.path.relpath(file_path, folder) operations.append(CommitOperationAdd(path_in_repo=repo_path, path_or_fileobj=file_path)) try: card = ModelCard.load(model_id, token=token) if not hasattr(card.data, "tags") or card.data.tags is None: card.data.tags = [] if "neuron" not in card.data.tags: card.data.tags.append("neuron") readme_path = os.path.join(folder, "README.md") card.save(readme_path) # Check if README.md is already in operations, if so update, else add readme_op = next((op for op in operations if op.path_in_repo == "README.md"), None) if readme_op: readme_op.path_or_fileobj = readme_path else: operations.append(CommitOperationAdd(path_in_repo="README.md", path_or_fileobj=readme_path)) except Exception as e: yield f"⚠️ Warning: Could not update model card: {e}" yield ("__RETURN__", operations) def generate_neuron_repo_name(api, original_model_id: str, task_or_pipeline: str, token:str) -> str: """Generate a name for the Neuron-optimized repository.""" # Replace '©' with '-' and add neuron suffix requesting_user = api.whoami(token=token)["name"] base_name = original_model_id.replace('/', '-') return f"{requesting_user}/{base_name}-neuron" def create_neuron_repo_and_upload( operations: List[CommitOperationAdd], original_model_id: str, model_type: str, task_or_pipeline: str, requesting_user: str, token: str, ) -> Generator[Union[str, RepoUrl], None, None]: """ Creates a new repository with Neuron files and uploads them. """ api = HfApi(token=token) if task_or_pipeline == "auto": try: task_or_pipeline = TasksManager.infer_task_from_model(original_model_id, token=token) except Exception as e: raise Exception(f"❌ Could not infer task for model {original_model_id}: {e}") # Generate repository name neuron_repo_name = generate_neuron_repo_name(api, original_model_id, task_or_pipeline, token) try: # Create the repository repo_url = create_repo( repo_id=neuron_repo_name, token=token, repo_type="model", private=False, exist_ok=True, ) # Get the appropriate class name for the Python example if model_type == "transformers": model_class = TASK_TO_MODEL_CLASS.get(task_or_pipeline) else: model_class = DIFFUSION_PIPELINE_MAPPING.get(task_or_pipeline) model_class_name = model_class.__name__ if model_class else "NeuronModel" # Create enhanced model card for the Neuron repo neuron_readme_content = f"""--- tags: - neuron - optimized - aws-neuron - {task_or_pipeline} base_model: {original_model_id} --- # Neuron-Optimized {original_model_id} This repository contains AWS Neuron-optimized files for [{original_model_id}](https://huggingface.co/{original_model_id}). ## Model Details - **Base Model**: [{original_model_id}](https://huggingface.co/{original_model_id}) - **Task**: {task_or_pipeline} - **Optimization**: AWS Neuron compilation - **Generated by**: [{requesting_user}](https://huggingface.co/{requesting_user}) - **Generated using**: [Optimum Neuron Compiler Space]({SPACES_URL}) ## Usage This model has been optimized for AWS Neuron devices (Inferentia/Trainium). To use it: ```python from optimum.neuron import {model_class_name} model = {model_class_name}.from_pretrained("{neuron_repo_name}") ``` ## Performance These files are pre-compiled for AWS Neuron devices and should provide improved inference performance compared to the original model when deployed on Inferentia or Trainium instances. ## Original Model For the original model, training details, and more information, please visit: [{original_model_id}](https://huggingface.co/{original_model_id}) """ # Update the README in operations readme_op = next((op for op in operations if op.path_in_repo == "README.md"), None) if readme_op: # Create a temporary file with the new content with NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f: f.write(neuron_readme_content) readme_op.path_or_fileobj = f.name else: # Add new README operation with NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f: f.write(neuron_readme_content) operations.append(CommitOperationAdd(path_in_repo="README.md", path_or_fileobj=f.name)) # Upload files to the new repository commit_message = f"Add Neuron-optimized files for {original_model_id}" commit_description = f""" 🤖 Neuron Export Bot: Adding AWS Neuron-optimized model files. Original model: [{original_model_id}](https://huggingface.co/{original_model_id}) Task: {task_or_pipeline} Generated by: [{requesting_user}](https://huggingface.co/{requesting_user}) Generated using: [Optimum Neuron Compiler Space]({SPACES_URL}) These files have been pre-compiled for AWS Neuron devices (Inferentia/Trainium) and should provide improved inference performance. """ commit_info = api.create_commit( repo_id=neuron_repo_name, operations=operations, commit_message=commit_message, commit_description=commit_description, token=token, ) yield f"✅ Repository created: {repo_url}" except Exception as e: yield f"❌ Failed to create/upload to Neuron repository: {e}" raise def create_readme_pr_for_original_model( original_model_id: str, neuron_repo_name: str, task_or_pipeline: str, requesting_user: str, token: str, ) -> Generator[Union[str, CommitInfo], None, None]: """ Creates a PR on the original model repository to add a link to the Neuron-optimized version. """ api = HfApi(token=token) yield f"📝 Creating PR to add Neuron repo link in {original_model_id}..." try: # Check if there's already an open PR pr_title = "Add link to Neuron-optimized version" existing_pr = previous_pr(api, original_model_id, pr_title) if existing_pr: yield f"⚠️ PR already exists: https://huggingface.co/{original_model_id}/discussions/{existing_pr.num}" return # Get the current README try: current_readme_path = api.hf_hub_download( repo_id=original_model_id, filename="README.md", token=token, ) with open(current_readme_path, 'r', encoding='utf-8') as f: readme_content = f.read() except Exception: # If README doesn't exist, create a basic one readme_content = f"# {original_model_id}\n\n" # Add Neuron optimization section, separated by a horizontal rule neuron_section = f""" --- ## 🚀 AWS Neuron Optimized Version Available A Neuron-optimized version of this model is available for improved performance on AWS Inferentia/Trainium instances: **[{neuron_repo_name}](https://huggingface.co/{neuron_repo_name})** The Neuron-optimized version provides: - Pre-compiled artifacts for faster loading - Optimized performance on AWS Neuron devices - Same model capabilities with improved inference speed """ # Append the Neuron section to the end of the README updated_readme = readme_content.rstrip() + "\n" + neuron_section # Create temporary file with updated README with NamedTemporaryFile(mode='w', suffix='.md', delete=False, encoding="utf-8") as f: f.write(updated_readme) temp_readme_path = f.name # Create the PR operations = [CommitOperationAdd(path_in_repo="README.md", path_or_fileobj=temp_readme_path)] commit_description = f""" 🤖 Neuron Export Bot: Adding link to Neuron-optimized version. A Neuron-optimized version of this model has been created at [{neuron_repo_name}](https://huggingface.co/{neuron_repo_name}). The optimized version provides improved performance on AWS Inferentia/Trainium instances with pre-compiled artifacts. Generated by: [{requesting_user}](https://huggingface.co/{requesting_user}) Generated using: [Optimum Neuron Compiler Space]({SPACES_URL}) """ pr = api.create_commit( repo_id=original_model_id, operations=operations, commit_message=pr_title, commit_description=commit_description, create_pr=True, token=token, ) yield f"✅ README PR created: https://huggingface.co/{original_model_id}/discussions/{pr.pr_num}" # Clean up temporary file os.unlink(temp_readme_path) except Exception as e: yield f"❌ Failed to create README PR: {e}" raise # --- Updated upload_to_custom_repo function (unchanged) --- def upload_to_custom_repo( operations: List[CommitOperationAdd], custom_repo_id: str, original_model_id: str, requesting_user: str, token: str, ) -> Generator[Union[str, CommitInfo], None, None]: """ Uploads neuron files to a custom repository and creates a PR. """ api = HfApi(token=token) try: # Ensure the custom repo exists api.repo_info(repo_id=custom_repo_id, repo_type="model") except Exception as e: yield f"❌ Could not access custom repository `{custom_repo_id}`. Please ensure it exists and you have write access. Error: {e}" raise pr_title = f"Add Neuron-optimized files for {original_model_id}" commit_description = f""" 🤖 Neuron Export Bot: On behalf of [{requesting_user}](https://huggingface.co/{requesting_user}), adding AWS Neuron-optimized model files for `{original_model_id}`. These files were generated using the [Optimum Neuron Compiler Space](https://huggingface.co/spaces/optimum/neuron-export). """ try: custom_pr = api.create_commit( repo_id=custom_repo_id, operations=operations, commit_message=pr_title, commit_description=commit_description, create_pr=True, token=token, ) yield f"✅ Custom PR created successfully: https://huggingface.co/{custom_repo_id}/discussions/{custom_pr.pr_num}" yield custom_pr except Exception as e: yield f"❌ Failed to create PR in custom repository: {e}" raise def convert( api: "HfApi", model_id: str, task_or_pipeline: str, model_type: str = "transformers", token: str = None, pr_options: Dict = None, ) -> Generator[Tuple[str, Any], None, None]: if pr_options is None: pr_options = {} info = api.model_info(model_id, token=token) filenames = {s.rfilename for s in info.siblings} requesting_user = api.whoami(token=token)["name"] if not any(pr_options.values()): yield "1", "⚠️ No option selected. Please choose at least one option." return if pr_options.get("create_custom_pr") and not pr_options.get("custom_repo_id"): yield "1", "⚠️ Custom PR selected but no repository ID was provided." return yield "0", f"🚀 Starting export process with options: {pr_options}..." if task_or_pipeline == "auto": try: task_or_pipeline = TasksManager.infer_task_from_model(model_id, token=token) except Exception as e: raise Exception(f"❌ Could not infer task for model {model_id}: {e}") with TemporaryDirectory() as temp_dir: export_folder = os.path.join(temp_dir, "export") cache_mirror_dir = os.path.join(temp_dir, "cache_mirror") os.makedirs(export_folder, exist_ok=True) os.makedirs(cache_mirror_dir, exist_ok=True) result_info = {} try: # --- Export Logic --- export_gen = export_and_git_add(model_id, task_or_pipeline, model_type, export_folder, token=token) operations = None for message in export_gen: if isinstance(message, tuple) and message[0] == "__RETURN__": operations = message[1] break else: yield "0", message if not operations: raise Exception("Export process did not produce any files to commit.") # --- Cache Handling --- if pr_options.get("create_cache_pr"): yield "0", f"📤 Creating a Pull Request for the cache repository ..." try: pr_title = f"Add Neuron cache artifacts for {model_id}" custom_pr_description = f""" 🤖 **Neuron Cache Sync Bot** This PR adds newly compiled cache artifacts for the model: - **Original Model ID:** `{model_id}` - **Task:** `{task_or_pipeline}` These files were generated to accelerate model loading on AWS Neuron devices. """ # 1. Create an instance of your generator commit_message = f"Synchronizing local compiler cache of {model_id}" inputs = get_default_inputs(task_or_pipeline) commit_description = f""" 🤖 **Neuron Cache Sync Bot** This commit adds newly compiled cache artifacts for the model: - **Original Model ID:** `{model_id}` - **Task:** `{task_or_pipeline}` - **Compilation inputs:** {inputs} - **Generated by:** [{requesting_user}](https://huggingface.co/{requesting_user}) - **Generated using:** [Optimum Neuron Model Exporter]({SPACES_URL}) These files were generated to accelerate model loading on AWS Neuron devices. """ pr_generator = synchronize_hub_cache_with_pr( cache_repo_id=CUSTOM_CACHE_REPO, commit_message=commit_message, commit_description=commit_description, token=token, ) pr_url = None # 2. Loop to process yielded status messages and capture the final return value while True: try: # Get the next status message from your generator status_message = next(pr_generator) yield "0", status_message except StopIteration as e: # The generator is finished. Its `return` value is in e.value. pr_url = e.value break # Exit the loop # 3. Process the final result if pr_url: yield "0", f"✅ Successfully captured PR URL." result_info["cache_pr"] = pr_url else: yield "0", "⚠️ PR process finished, but no URL was returned. This may be expected in non-blocking mode." except Exception as e: yield "0", f"❌ Failed to create cache PR: {e}" # --- New Repository Creation (Replaces Model PR) --- if pr_options.get("create_neuron_repo"): yield "0", "🏗️ Creating new Neuron-optimized repository..." neuron_repo_url = None # Generate the repo name first so we can use it consistently neuron_repo_name = generate_neuron_repo_name(api, model_id, task_or_pipeline, token) repo_creation_gen = create_neuron_repo_and_upload( operations, model_id, model_type, task_or_pipeline, requesting_user, token ) for msg in repo_creation_gen: if isinstance(msg, str): yield "0", msg else: neuron_repo_url = msg result_info["neuron_repo"] = f"https://huggingface.co/{neuron_repo_name}" # Automatically create a PR on the original model to add a link readme_pr = None readme_pr_gen = create_readme_pr_for_original_model( model_id, neuron_repo_name, task_or_pipeline, requesting_user, token ) for msg in readme_pr_gen: if isinstance(msg, str): yield "0", msg else: readme_pr = msg if readme_pr: result_info["readme_pr"] = f"https://huggingface.co/{model_id}/discussions/{readme_pr.pr_num}" # --- Custom Repository PR --- if pr_options.get("create_custom_pr"): custom_repo_id = pr_options["custom_repo_id"] yield "0", f"📤 Creating PR in custom repository: {custom_repo_id}..." custom_pr = None custom_upload_gen = upload_to_custom_repo(operations, custom_repo_id, model_id, requesting_user, token) for msg in custom_upload_gen: if isinstance(msg, str): yield "0", msg else: custom_pr = msg if custom_pr: result_info["custom_pr"] = f"https://huggingface.co/{custom_repo_id}/discussions/{custom_pr.pr_num}" yield "0", result_info except Exception as e: yield "1", f"❌ Conversion failed with a critical error: {e}" # Re-raise the exception to be caught by the outer try-except in the Gradio app if needed raise