neuron-exporter / optimum_neuron_export.py
badaoui's picture
badaoui HF Staff
Update optimum_neuron_export.py
603a4cd verified
raw
history blame
26.1 kB
import os
import shutil
from tempfile import TemporaryDirectory, NamedTemporaryFile
from typing import List, Union, Optional, Tuple, Dict, Any, Generator
from pathlib import Path
import torch
import gradio as gr
from huggingface_hub import (
CommitOperationAdd,
HfApi,
ModelCard,
Discussion,
CommitInfo,
create_repo,
RepoUrl,
)
from huggingface_hub.file_download import repo_folder_name
from optimum.exporters.tasks import TasksManager
from optimum.exporters.neuron.model_configs import *
from optimum.neuron import (
NeuronModelForFeatureExtraction,
NeuronModelForSentenceTransformers,
NeuronModelForMaskedLM,
NeuronModelForQuestionAnswering,
NeuronModelForSequenceClassification,
NeuronModelForTokenClassification,
NeuronModelForMultipleChoice,
NeuronModelForImageClassification,
NeuronModelForSemanticSegmentation,
NeuronModelForObjectDetection,
NeuronModelForAudioClassification,
NeuronModelForAudioFrameClassification,
NeuronModelForCTC,
NeuronModelForXVector,
NeuronModelForCausalLM,
NeuronModelForSeq2SeqLM,
)
from optimum.neuron import (
NeuronDiffusionPipelineBase,
NeuronStableDiffusionPipeline,
NeuronStableDiffusionImg2ImgPipeline,
NeuronStableDiffusionInpaintPipeline,
NeuronStableDiffusionInstructPix2PixPipeline,
NeuronLatentConsistencyModelPipeline,
NeuronStableDiffusionXLPipeline,
NeuronStableDiffusionXLImg2ImgPipeline,
NeuronStableDiffusionXLInpaintPipeline,
NeuronStableDiffusionControlNetPipeline,
NeuronStableDiffusionXLControlNetPipeline,
NeuronPixArtAlphaPipeline,
NeuronPixArtSigmaPipeline,
NeuronFluxPipeline
)
from optimum.neuron.cache import synchronize_hub_cache
from synchronizer import synchronize_hub_cache_with_pr
from optimum.exporters.neuron import main_export, build_stable_diffusion_components_mandatory_shapes
SPACES_URL = "https://huggingface.co/spaces/optimum/neuron-export"
CUSTOM_CACHE_REPO = os.getenv("CUSTOM_CACHE_REPO")
HF_TOKEN = os.environ.get("HF_TOKEN")
# Task to NeuronModel mapping for transformers
TASK_TO_MODEL_CLASS = {
"feature-extraction": NeuronModelForFeatureExtraction,
"sentence-transformers": NeuronModelForSentenceTransformers,
"fill-mask": NeuronModelForMaskedLM,
"question-answering": NeuronModelForQuestionAnswering,
"text-classification": NeuronModelForSequenceClassification,
"token-classification": NeuronModelForTokenClassification,
"multiple-choice": NeuronModelForMultipleChoice,
"image-classification": NeuronModelForImageClassification,
"semantic-segmentation": NeuronModelForSemanticSegmentation,
"object-detection": NeuronModelForObjectDetection,
"audio-classification": NeuronModelForAudioClassification,
"audio-frame-classification": NeuronModelForAudioFrameClassification,
"automatic-speech-recognition": NeuronModelForCTC,
"audio-xvector": NeuronModelForXVector,
"text-generation": NeuronModelForCausalLM,
"text2text-generation": NeuronModelForSeq2SeqLM,
}
# Diffusion pipeline mapping
DIFFUSION_PIPELINE_MAPPING = {
"text-to-image": StableDiffusionPipeline,
"image-to-image": StableDiffusionImg2ImgPipeline,
"inpaint": StableDiffusionInpaintPipeline,
"instruct-pix2pix": StableDiffusionInstructPix2PixPipeline,
"latent-consistency": LatentConsistencyModelPipeline,
"stable-diffusion": StableDiffusionPipeline,
"stable-diffusion-xl": StableDiffusionXLPipeline,
"stable-diffusion-xl-img2img": StableDiffusionXLImg2ImgPipeline,
"stable-diffusion-xl-inpaint": StableDiffusionXLInpaintPipeline,
"controlnet": StableDiffusionControlNetPipeline,
"controlnet-xl": StableDiffusionXLControlNetPipeline,
"pixart-alpha": PixArtAlphaPipeline,
"pixart-sigma": PixArtSigmaPipeline,
"flux": FluxPipeline,
}
ENCODER_TASKS = {"feature-extraction","sentence-transformers","fill-mask","question-answering","text-classification","token-classification","multiple-choice","image-classification","semantic-segmentation","object-detection","audio-classification","audio-frame-classification","automatic-speech-recognition","audio-xvector"}
DECODER_TASKS = {"text-generation"}
SEQ2SEQ_TAKS = {"text2text-generation"}
def get_default_inputs(task_or_pipeline: str) -> Dict[str, int]:
"""Get default input shapes based on task type or diffusion pipeline type."""
if task_or_pipeline in ["feature-extraction", "sentence-transformers", "fill-mask", "question-answering", "text-classification", "token-classification","text-generation"]:
return {"batch_size": 1, "sequence_length": 128}
elif task_or_pipeline == "multiple-choice":
return {"batch_size": 1, "num_choices": 4, "sequence_length": 128}
elif task_or_pipeline == "text2text-generation":
return {"batch_size": 1, "sequence_length": 128, "num_beams":4}
elif task_or_pipeline in ["image-classification", "semantic-segmentation", "object-detection"]:
return {"batch_size": 1, "num_channels": 3, "height": 224, "width": 224}
elif task_or_pipeline in ["audio-classification", "audio-frame-classification", "audio-xvector"]:
return {"batch_size": 1, "audio_sequence_length": 16000}
elif task_or_pipeline in DIFFUSION_PIPELINE_MAPPING:
return {"batch_size": 1, "height": 1024, "width": 1024, "num_images_per_prompt": 1, "torch_dtype":torch.bfloat16}
else:
# Default to text-based shapes
return {"batch_size": 1, "sequence_length": 128}
def previous_pr(api: "HfApi", model_id: str, pr_title: str) -> Optional["Discussion"]:
try:
discussions = api.get_repo_discussions(repo_id=model_id)
except Exception:
return None
for discussion in discussions:
if (
discussion.status == "open"
and discussion.is_pull_request
and discussion.title == pr_title
):
return discussion
return None
def export(model_id: str, task_or_pipeline:str, model_type: str, folder: str):
yield f"📦 Exporting model `{model_id}` for task `{task_or_pipeline}`..."
if model_type == "diffusers":
model_class = DIFFUSION_PIPELINE_MAPPING.get(task_or_pipeline)
inputs = get_default_inputs(task_or_pipeline)
yield f"🔧 Using default inputs: {inputs}"
if task_or_pipeline in ENCODER_TASKS or SEQ2SEQ_TAKS:
result = main_export(
model_name_or_path=model_id,
output=folder,
token=HF_TOKEN,
task=task_or_pipeline,
cpu_backend=True,
do_validation=False,
**inputs,
)
if task_or_pipeline in DECODER_TASKS:
neuron_config = NeuronModelForCausalLM.get_neuron_config(model_name_or_path=model_id, **inputs)
neuron_model = NeuronModelForCausalLM.export(
model_id=export_decoder_id,
neuron_config=neuron_config,
token = HF_TOKEN,
)
model.save_pretrained(folder)
if task_or_pipeline in DIFFUSION_PIPELINE_MAPPING:
model = model_class.from_pretrained(model_id)
input_shapes = build_stable_diffusion_components_mandatory_shapes(**inputs)
compiler_kwargs = {"auto_cast": "matmul", "auto_cast_type": "bf16"}
result = main_export(
model_name_or_path=model_id,
output=folder,
compiler_kwargs=compiler_kwargs,
token=HF_TOKEN,
library_name=model_type,
cpu_backend=True,
model=model,
**input_shapes,
)
def export_and_git_add(model_id: str, task_or_pipeline: str, model_type: str, folder: str, token: str) -> Any:
try:
export(model_id, task_or_pipeline, model_type, folder)
yield "✅ Export completed successfully."
except Exception as e:
yield f"❌ Export failed with error: {e}"
raise
operations = []
for root, _, files in os.walk(folder):
for filename in files:
file_path = os.path.join(root, filename)
repo_path = os.path.relpath(file_path, folder)
operations.append(CommitOperationAdd(path_in_repo=repo_path, path_or_fileobj=file_path))
try:
card = ModelCard.load(model_id, token=token)
if not hasattr(card.data, "tags") or card.data.tags is None:
card.data.tags = []
if "neuron" not in card.data.tags:
card.data.tags.append("neuron")
readme_path = os.path.join(folder, "README.md")
card.save(readme_path)
# Check if README.md is already in operations, if so update, else add
readme_op = next((op for op in operations if op.path_in_repo == "README.md"), None)
if readme_op:
readme_op.path_or_fileobj = readme_path
else:
operations.append(CommitOperationAdd(path_in_repo="README.md", path_or_fileobj=readme_path))
except Exception as e:
yield f"⚠️ Warning: Could not update model card: {e}"
yield ("__RETURN__", operations)
def generate_neuron_repo_name(api, original_model_id: str, task_or_pipeline: str, token:str) -> str:
"""Generate a name for the Neuron-optimized repository."""
# Replace '©' with '-' and add neuron suffix
requesting_user = api.whoami(token=token)["name"]
base_name = original_model_id.replace('/', '-')
return f"{requesting_user}/{base_name}-neuron"
def create_neuron_repo_and_upload(
operations: List[CommitOperationAdd],
original_model_id: str,
model_type: str,
task_or_pipeline: str,
requesting_user: str,
token: str,
) -> Generator[Union[str, RepoUrl], None, None]:
"""
Creates a new repository with Neuron files and uploads them.
"""
api = HfApi(token=token)
if task_or_pipeline == "auto":
try:
task_or_pipeline = TasksManager.infer_task_from_model(original_model_id, token=token)
except Exception as e:
raise Exception(f"❌ Could not infer task for model {original_model_id}: {e}")
# Generate repository name
neuron_repo_name = generate_neuron_repo_name(api, original_model_id, task_or_pipeline, token)
try:
# Create the repository
repo_url = create_repo(
repo_id=neuron_repo_name,
token=token,
repo_type="model",
private=False,
exist_ok=True,
)
# Get the appropriate class name for the Python example
if model_type == "transformers":
model_class = TASK_TO_MODEL_CLASS.get(task_or_pipeline)
else:
model_class = DIFFUSION_PIPELINE_MAPPING.get(task_or_pipeline)
model_class_name = model_class.__name__ if model_class else "NeuronModel"
# Create enhanced model card for the Neuron repo
neuron_readme_content = f"""---
tags:
- neuron
- optimized
- aws-neuron
- {task_or_pipeline}
base_model: {original_model_id}
---
# Neuron-Optimized {original_model_id}
This repository contains AWS Neuron-optimized files for [{original_model_id}](https://huggingface.co/{original_model_id}).
## Model Details
- **Base Model**: [{original_model_id}](https://huggingface.co/{original_model_id})
- **Task**: {task_or_pipeline}
- **Optimization**: AWS Neuron compilation
- **Generated by**: [{requesting_user}](https://huggingface.co/{requesting_user})
- **Generated using**: [Optimum Neuron Compiler Space]({SPACES_URL})
## Usage
This model has been optimized for AWS Neuron devices (Inferentia/Trainium). To use it:
```python
from optimum.neuron import {model_class_name}
model = {model_class_name}.from_pretrained("{neuron_repo_name}")
```
## Performance
These files are pre-compiled for AWS Neuron devices and should provide improved inference performance compared to the original model when deployed on Inferentia or Trainium instances.
## Original Model
For the original model, training details, and more information, please visit: [{original_model_id}](https://huggingface.co/{original_model_id})
"""
# Update the README in operations
readme_op = next((op for op in operations if op.path_in_repo == "README.md"), None)
if readme_op:
# Create a temporary file with the new content
with NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f:
f.write(neuron_readme_content)
readme_op.path_or_fileobj = f.name
else:
# Add new README operation
with NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f:
f.write(neuron_readme_content)
operations.append(CommitOperationAdd(path_in_repo="README.md", path_or_fileobj=f.name))
# Upload files to the new repository
commit_message = f"Add Neuron-optimized files for {original_model_id}"
commit_description = f"""
🤖 Neuron Export Bot: Adding AWS Neuron-optimized model files.
Original model: [{original_model_id}](https://huggingface.co/{original_model_id})
Task: {task_or_pipeline}
Generated by: [{requesting_user}](https://huggingface.co/{requesting_user})
Generated using: [Optimum Neuron Compiler Space]({SPACES_URL})
These files have been pre-compiled for AWS Neuron devices (Inferentia/Trainium) and should provide improved inference performance.
"""
commit_info = api.create_commit(
repo_id=neuron_repo_name,
operations=operations,
commit_message=commit_message,
commit_description=commit_description,
token=token,
)
yield f"✅ Repository created: {repo_url}"
except Exception as e:
yield f"❌ Failed to create/upload to Neuron repository: {e}"
raise
def create_readme_pr_for_original_model(
original_model_id: str,
neuron_repo_name: str,
task_or_pipeline: str,
requesting_user: str,
token: str,
) -> Generator[Union[str, CommitInfo], None, None]:
"""
Creates a PR on the original model repository to add a link to the Neuron-optimized version.
"""
api = HfApi(token=token)
yield f"📝 Creating PR to add Neuron repo link in {original_model_id}..."
try:
# Check if there's already an open PR
pr_title = "Add link to Neuron-optimized version"
existing_pr = previous_pr(api, original_model_id, pr_title)
if existing_pr:
yield f"⚠️ PR already exists: https://huggingface.co/{original_model_id}/discussions/{existing_pr.num}"
return
# Get the current README
try:
current_readme_path = api.hf_hub_download(
repo_id=original_model_id,
filename="README.md",
token=token,
)
with open(current_readme_path, 'r', encoding='utf-8') as f:
readme_content = f.read()
except Exception:
# If README doesn't exist, create a basic one
readme_content = f"# {original_model_id}\n\n"
# Add Neuron optimization section, separated by a horizontal rule
neuron_section = f"""
---
## 🚀 AWS Neuron Optimized Version Available
A Neuron-optimized version of this model is available for improved performance on AWS Inferentia/Trainium instances:
**[{neuron_repo_name}](https://huggingface.co/{neuron_repo_name})**
The Neuron-optimized version provides:
- Pre-compiled artifacts for faster loading
- Optimized performance on AWS Neuron devices
- Same model capabilities with improved inference speed
"""
# Append the Neuron section to the end of the README
updated_readme = readme_content.rstrip() + "\n" + neuron_section
# Create temporary file with updated README
with NamedTemporaryFile(mode='w', suffix='.md', delete=False, encoding="utf-8") as f:
f.write(updated_readme)
temp_readme_path = f.name
# Create the PR
operations = [CommitOperationAdd(path_in_repo="README.md", path_or_fileobj=temp_readme_path)]
commit_description = f"""
🤖 Neuron Export Bot: Adding link to Neuron-optimized version.
A Neuron-optimized version of this model has been created at [{neuron_repo_name}](https://huggingface.co/{neuron_repo_name}).
The optimized version provides improved performance on AWS Inferentia/Trainium instances with pre-compiled artifacts.
Generated by: [{requesting_user}](https://huggingface.co/{requesting_user})
Generated using: [Optimum Neuron Compiler Space]({SPACES_URL})
"""
pr = api.create_commit(
repo_id=original_model_id,
operations=operations,
commit_message=pr_title,
commit_description=commit_description,
create_pr=True,
token=token,
)
yield f"✅ README PR created: https://huggingface.co/{original_model_id}/discussions/{pr.pr_num}"
# Clean up temporary file
os.unlink(temp_readme_path)
except Exception as e:
yield f"❌ Failed to create README PR: {e}"
raise
# --- Updated upload_to_custom_repo function (unchanged) ---
def upload_to_custom_repo(
operations: List[CommitOperationAdd],
custom_repo_id: str,
original_model_id: str,
requesting_user: str,
token: str,
) -> Generator[Union[str, CommitInfo], None, None]:
"""
Uploads neuron files to a custom repository and creates a PR.
"""
api = HfApi(token=token)
try:
# Ensure the custom repo exists
api.repo_info(repo_id=custom_repo_id, repo_type="model")
except Exception as e:
yield f"❌ Could not access custom repository `{custom_repo_id}`. Please ensure it exists and you have write access. Error: {e}"
raise
pr_title = f"Add Neuron-optimized files for {original_model_id}"
commit_description = f"""
🤖 Neuron Export Bot: On behalf of [{requesting_user}](https://huggingface.co/{requesting_user}), adding AWS Neuron-optimized model files for `{original_model_id}`.
These files were generated using the [Optimum Neuron Compiler Space](https://huggingface.co/spaces/optimum/neuron-export).
"""
try:
custom_pr = api.create_commit(
repo_id=custom_repo_id,
operations=operations,
commit_message=pr_title,
commit_description=commit_description,
create_pr=True,
token=token,
)
yield f"✅ Custom PR created successfully: https://huggingface.co/{custom_repo_id}/discussions/{custom_pr.pr_num}"
yield custom_pr
except Exception as e:
yield f"❌ Failed to create PR in custom repository: {e}"
raise
def convert(
api: "HfApi",
model_id: str,
task_or_pipeline: str,
model_type: str = "transformers",
token: str = None,
pr_options: Dict = None,
) -> Generator[Tuple[str, Any], None, None]:
if pr_options is None:
pr_options = {}
info = api.model_info(model_id, token=token)
filenames = {s.rfilename for s in info.siblings}
requesting_user = api.whoami(token=token)["name"]
if not any(pr_options.values()):
yield "1", "⚠️ No option selected. Please choose at least one option."
return
if pr_options.get("create_custom_pr") and not pr_options.get("custom_repo_id"):
yield "1", "⚠️ Custom PR selected but no repository ID was provided."
return
yield "0", f"🚀 Starting export process with options: {pr_options}..."
if task_or_pipeline == "auto":
try:
task_or_pipeline = TasksManager.infer_task_from_model(model_id, token=token)
except Exception as e:
raise Exception(f"❌ Could not infer task for model {model_id}: {e}")
with TemporaryDirectory() as temp_dir:
export_folder = os.path.join(temp_dir, "export")
cache_mirror_dir = os.path.join(temp_dir, "cache_mirror")
os.makedirs(export_folder, exist_ok=True)
os.makedirs(cache_mirror_dir, exist_ok=True)
result_info = {}
try:
# --- Export Logic ---
export_gen = export_and_git_add(model_id, task_or_pipeline, model_type, export_folder, token=token)
operations = None
for message in export_gen:
if isinstance(message, tuple) and message[0] == "__RETURN__":
operations = message[1]
break
else:
yield "0", message
if not operations:
raise Exception("Export process did not produce any files to commit.")
# --- Cache Handling ---
if pr_options.get("create_cache_pr"):
yield "0", f"📤 Creating a Pull Request for the cache repository ..."
try:
pr_title = f"Add Neuron cache artifacts for {model_id}"
custom_pr_description = f"""
🤖 **Neuron Cache Sync Bot**
This PR adds newly compiled cache artifacts for the model:
- **Original Model ID:** `{model_id}`
- **Task:** `{task_or_pipeline}`
These files were generated to accelerate model loading on AWS Neuron devices.
"""
# 1. Create an instance of your generator
commit_message = f"Synchronizing local compiler cache of {model_id}"
inputs = get_default_inputs(task_or_pipeline)
commit_description = f"""
🤖 **Neuron Cache Sync Bot**
This commit adds newly compiled cache artifacts for the model:
- **Original Model ID:** `{model_id}`
- **Task:** `{task_or_pipeline}`
- **Compilation inputs:** {inputs}
- **Generated by:** [{requesting_user}](https://huggingface.co/{requesting_user})
- **Generated using:** [Optimum Neuron Model Exporter]({SPACES_URL})
These files were generated to accelerate model loading on AWS Neuron devices.
"""
pr_generator = synchronize_hub_cache_with_pr(
cache_repo_id=CUSTOM_CACHE_REPO,
commit_message=commit_message,
commit_description=commit_description,
token=token,
)
pr_url = None
# 2. Loop to process yielded status messages and capture the final return value
while True:
try:
# Get the next status message from your generator
status_message = next(pr_generator)
yield "0", status_message
except StopIteration as e:
# The generator is finished. Its `return` value is in e.value.
pr_url = e.value
break # Exit the loop
# 3. Process the final result
if pr_url:
yield "0", f"✅ Successfully captured PR URL."
result_info["cache_pr"] = pr_url
else:
yield "0", "⚠️ PR process finished, but no URL was returned. This may be expected in non-blocking mode."
except Exception as e:
yield "0", f"❌ Failed to create cache PR: {e}"
# --- New Repository Creation (Replaces Model PR) ---
if pr_options.get("create_neuron_repo"):
yield "0", "🏗️ Creating new Neuron-optimized repository..."
neuron_repo_url = None
# Generate the repo name first so we can use it consistently
neuron_repo_name = generate_neuron_repo_name(api, model_id, task_or_pipeline, token)
repo_creation_gen = create_neuron_repo_and_upload(
operations, model_id, model_type, task_or_pipeline, requesting_user, token
)
for msg in repo_creation_gen:
if isinstance(msg, str):
yield "0", msg
else:
neuron_repo_url = msg
result_info["neuron_repo"] = f"https://huggingface.co/{neuron_repo_name}"
# Automatically create a PR on the original model to add a link
readme_pr = None
readme_pr_gen = create_readme_pr_for_original_model(
model_id, neuron_repo_name, task_or_pipeline, requesting_user, token
)
for msg in readme_pr_gen:
if isinstance(msg, str):
yield "0", msg
else:
readme_pr = msg
if readme_pr:
result_info["readme_pr"] = f"https://huggingface.co/{model_id}/discussions/{readme_pr.pr_num}"
# --- Custom Repository PR ---
if pr_options.get("create_custom_pr"):
custom_repo_id = pr_options["custom_repo_id"]
yield "0", f"📤 Creating PR in custom repository: {custom_repo_id}..."
custom_pr = None
custom_upload_gen = upload_to_custom_repo(operations, custom_repo_id, model_id, requesting_user, token)
for msg in custom_upload_gen:
if isinstance(msg, str):
yield "0", msg
else:
custom_pr = msg
if custom_pr:
result_info["custom_pr"] = f"https://huggingface.co/{custom_repo_id}/discussions/{custom_pr.pr_num}"
yield "0", result_info
except Exception as e:
yield "1", f"❌ Conversion failed with a critical error: {e}"
# Re-raise the exception to be caught by the outer try-except in the Gradio app if needed
raise