neuron-exporter / optimum_neuron_export.py
badaoui's picture
badaoui HF Staff
bigger bs, seq_len for text-generation task
6f698cf verified
import os
import shutil
from tempfile import TemporaryDirectory, NamedTemporaryFile
from typing import List, Union, Optional, Tuple, Dict, Any, Generator
from pathlib import Path
import torch
import gradio as gr
from huggingface_hub import (
CommitOperationAdd,
HfApi,
ModelCard,
Discussion,
CommitInfo,
create_repo,
RepoUrl,
)
from huggingface_hub.file_download import repo_folder_name
from optimum.exporters.tasks import TasksManager
from optimum.exporters.neuron.model_configs import *
from optimum.exporters.neuron import build_stable_diffusion_components_mandatory_shapes
from optimum.exporters.neuron.model_configs import *
from optimum.exporters.neuron import get_submodels_and_neuron_configs, export_models
from optimum.neuron import (
NeuronModelForFeatureExtraction,
NeuronModelForSentenceTransformers,
NeuronModelForMaskedLM,
NeuronModelForQuestionAnswering,
NeuronModelForSequenceClassification,
NeuronModelForTokenClassification,
NeuronModelForMultipleChoice,
NeuronModelForImageClassification,
NeuronModelForSemanticSegmentation,
NeuronModelForObjectDetection,
NeuronModelForAudioClassification,
NeuronModelForAudioFrameClassification,
NeuronModelForCTC,
NeuronModelForXVector,
NeuronModelForCausalLM,
NeuronModelForSeq2SeqLM,
)
from diffusers import (
StableDiffusionPipeline,
StableDiffusionImg2ImgPipeline,
StableDiffusionInpaintPipeline,
StableDiffusionXLPipeline,
StableDiffusionXLImg2ImgPipeline,
StableDiffusionXLInpaintPipeline,
LatentConsistencyModelPipeline,
PixArtAlphaPipeline,
PixArtSigmaPipeline,
FluxPipeline,
FluxInpaintPipeline,
FluxImg2ImgPipeline,
FluxKontextPipeline,
)
from optimum.neuron.cache import synchronize_hub_cache
from synchronizer import synchronize_hub_cache_with_pr
SPACES_URL = "https://huggingface.co/spaces/optimum/neuron-export"
CUSTOM_CACHE_REPO = os.getenv("CUSTOM_CACHE_REPO")
HF_TOKEN = os.environ.get("HF_TOKEN")
# Task to NeuronModel mapping for transformers
TASK_TO_MODEL_CLASS = {
"feature-extraction": NeuronModelForFeatureExtraction,
"sentence-transformers": NeuronModelForSentenceTransformers,
"fill-mask": NeuronModelForMaskedLM,
"question-answering": NeuronModelForQuestionAnswering,
"text-classification": NeuronModelForSequenceClassification,
"token-classification": NeuronModelForTokenClassification,
"multiple-choice": NeuronModelForMultipleChoice,
"image-classification": NeuronModelForImageClassification,
"semantic-segmentation": NeuronModelForSemanticSegmentation,
"object-detection": NeuronModelForObjectDetection,
"audio-classification": NeuronModelForAudioClassification,
"audio-frame-classification": NeuronModelForAudioFrameClassification,
"automatic-speech-recognition": NeuronModelForCTC,
"audio-xvector": NeuronModelForXVector,
"text-generation": NeuronModelForCausalLM,
"text2text-generation": NeuronModelForSeq2SeqLM,
}
# Diffusion pipeline mapping with their corresponding diffusers classes and supported tasks
DIFFUSION_PIPELINE_MAPPING = {
"stable-diffusion": {
"class": StableDiffusionPipeline,
"tasks": ["text-to-image"],
"default_task": "text-to-image"
},
"stable-diffusion-img2img": {
"class": StableDiffusionImg2ImgPipeline,
"tasks": ["image-to-image"],
"default_task": "image-to-image"
},
"stable-diffusion-inpaint": {
"class": StableDiffusionInpaintPipeline,
"tasks": ["inpaint"],
"default_task": "inpaint"
},
"stable-diffusion-xl": {
"class": StableDiffusionXLPipeline,
"tasks": ["text-to-image"],
"default_task": "text-to-image"
},
"stable-diffusion-xl-img2img": {
"class": StableDiffusionXLImg2ImgPipeline,
"tasks": ["image-to-image"],
"default_task": "image-to-image"
},
"stable-diffusion-xl-inpaint": {
"class": StableDiffusionXLInpaintPipeline,
"tasks": ["inpaint"],
"default_task": "inpaint"
},
"lcm": {
"class": LatentConsistencyModelPipeline,
"tasks": ["text-to-image"],
"default_task": "text-to-image"
},
"pixart-alpha": {
"class": PixArtAlphaPipeline,
"tasks": ["text-to-image"],
"default_task": "text-to-image"
},
"pixart-sigma": {
"class": PixArtSigmaPipeline,
"tasks": ["text-to-image"],
"default_task": "text-to-image"
},
"flux": {
"class": FluxPipeline,
"tasks": ["text-to-image"],
"default_task": "text-to-image"
},
"flux-inpaint": {
"class": FluxInpaintPipeline,
"tasks": ["inpaint"],
"default_task": "inpaint"
},
"flux-kontext": {
"class": FluxKontextPipeline,
"tasks": ["text-to-image", "image-to-image"],
"default_task": "text-to-image"
},
}
def get_default_inputs(task_or_pipeline: str, pipeline_name: str = None) -> Dict[str, int]:
"""Get default input shapes based on task type or diffusion pipeline type."""
if task_or_pipeline in ["feature-extraction", "sentence-transformers", "fill-mask", "question-answering", "text-classification", "token-classification"]:
return {"batch_size": 1, "sequence_length": 128}
elif task_or_pipeline == "text_generation":
return {"batch_size": 4, "sequence_length": 4096, "tensor_parallel_size": 4}
elif task_or_pipeline == "multiple-choice":
return {"batch_size": 1, "num_choices": 4, "sequence_length": 128}
elif task_or_pipeline == "text2text-generation":
return {"batch_size": 1, "sequence_length": 128, "num_beams":4}
elif task_or_pipeline in ["image-classification", "semantic-segmentation", "object-detection"]:
return {"batch_size": 1, "num_channels": 3, "height": 224, "width": 224}
elif task_or_pipeline in ["audio-classification", "audio-frame-classification", "audio-xvector"]:
return {"batch_size": 1, "audio_sequence_length": 16000}
elif pipeline_name and pipeline_name in DIFFUSION_PIPELINE_MAPPING:
# For diffusion models, use appropriate sizes based on pipeline
if "xl" in pipeline_name.lower():
return {"batch_size": 1, "height": 1024, "width": 1024, "num_images_per_prompt": 1}
else:
return {"batch_size": 1, "height": 512, "width": 512, "num_images_per_prompt": 1}
else:
# Default to text-based shapes
return {"batch_size": 1, "sequence_length": 128}
def find_neuron_cache_artifacts(cache_base_dir: str = "/var/tmp/neuron-compile-cache") -> Optional[str]:
"""
Find the most recently created Neuron cache artifacts directory.
Returns the path to the MODULE directory containing the compiled artifacts.
"""
if not os.path.exists(cache_base_dir):
return None
# Find all MODULE directories
module_dirs = []
for root, dirs, files in os.walk(cache_base_dir):
for d in dirs:
if d.startswith("MODULE_"):
full_path = os.path.join(root, d)
# Check if it contains the expected files (for transformers)
if os.path.exists(os.path.join(full_path, "model.neuron")):
module_dirs.append(full_path)
if not module_dirs:
return None
# Return the most recently modified directory
return max(module_dirs, key=os.path.getmtime)
def previous_pr(api: "HfApi", model_id: str, pr_title: str) -> Optional["Discussion"]:
try:
discussions = api.get_repo_discussions(repo_id=model_id)
except Exception:
return None
for discussion in discussions:
if (
discussion.status == "open"
and discussion.is_pull_request
and discussion.title == pr_title
):
return discussion
return None
def export_diffusion_model(model_id: str, pipeline_name: str, task: str, folder: str, token: str) -> Generator:
"""Export diffusion model using optimum.exporters.neuron"""
yield f"📦 Exporting diffusion model `{model_id}` with pipeline `{pipeline_name}` for task `{task}`..."
if pipeline_name not in DIFFUSION_PIPELINE_MAPPING:
supported = list(DIFFUSION_PIPELINE_MAPPING.keys())
raise Exception(f"❌ Unsupported pipeline: {pipeline_name}. Supported: {supported}")
pipeline_config = DIFFUSION_PIPELINE_MAPPING[pipeline_name]
pipeline_class = pipeline_config["class"]
# Get default inputs
inputs = get_default_inputs(task, pipeline_name)
yield f"🔧 Using default inputs: {inputs}"
try:
# Load the pipeline
yield "📥 Loading diffusion pipeline..."
model = pipeline_class.from_pretrained(model_id, token=token)
# Build input shapes for compilation
input_shapes = build_stable_diffusion_components_mandatory_shapes(**inputs)
# Compiler arguments
compiler_kwargs = {
"auto_cast": "matmul",
"auto_cast_type": "bf16",
}
yield "🔨 Starting compilation process..."
# Get submodels and neuron configs
models_and_neuron_configs, output_model_names = get_submodels_and_neuron_configs(
model=model,
input_shapes=input_shapes,
task=task,
library_name="diffusers",
tensor_parallel_size=4,
output=Path(folder),
model_name_or_path=model_id,
)
# Export models
_, neuron_outputs = export_models(
models_and_neuron_configs=models_and_neuron_configs,
task=task,
output_dir=Path(folder),
output_file_names=output_model_names,
compiler_kwargs=compiler_kwargs,
)
yield f"✅ Diffusion model export completed. Files saved to {folder}"
except Exception as e:
yield f"❌ Export failed with error: {e}"
raise
def export_transformer_model(model_id: str, task: str, folder: str, token: str) -> Generator:
"""Export transformer model using optimum.neuron"""
yield f"📦 Exporting transformer model `{model_id}` for task `{task}`..."
model_class = TASK_TO_MODEL_CLASS.get(task)
if model_class is None:
supported = list(TASK_TO_MODEL_CLASS.keys())
raise Exception(f"❌ Unsupported task: {task}. Supported: {supported}")
inputs = get_default_inputs(task)
compiler_configs = {"auto_cast": "matmul", "auto_cast_type": "bf16", "instance_type": "inf2"}
yield f"🔧 Using default inputs: {inputs}"
try:
# Trigger the export/compilation
model = model_class.from_pretrained(
model_id,
export=True,
tensor_parallel_size=4,
token=token,
**compiler_configs,
**inputs,
)
yield "✅ Export/compilation completed successfully."
# Find the newly created cache artifacts
yield "🔍 Locating compiled artifacts in Neuron cache..."
cache_artifact_dir = find_neuron_cache_artifacts()
if not cache_artifact_dir:
raise Exception("❌ Could not find compiled artifacts in Neuron cache")
yield f"📂 Found artifacts at: {cache_artifact_dir}"
# Copy artifacts from cache to our target folder
yield f"📋 Copying artifacts to export folder..."
if os.path.exists(folder):
shutil.rmtree(folder)
shutil.copytree(cache_artifact_dir, folder)
yield f"✅ Artifacts successfully copied to {folder}"
except Exception as e:
yield f"❌ Export failed with error: {e}"
raise
def export_decoder_model(model_id: str, folder: str, token:str) -> Generator:
"""Export decoder-only models using optimum.neuron"""
yield f"📦 Exporting decoder model `{model_id}` ..."
export_kwargs = get_default_inputs("text_generation")
yield f"🔧 Using default export_kwargs: {export_kwargs}"
try:
# Trigger the export/compilation
neuron_config = NeuronModelForCausalLM.get_neuron_config(model_name_or_path=model_id, token=token, **export_kwargs)
neuron_config.target = "inf2"
model = NeuronModelForCausalLM.export(
model_id=model_id, neuron_config=neuron_config, token=token,
)
model.save_pretrained(folder)
yield f"✅ Decoder model export completed. Files saved to {folder}"
except Exception as e:
yield f"❌ Export failed with error: {e}"
raise
def export_and_git_add(model_id: str, task_or_pipeline: str, model_type: str, folder: str, token: str, pipeline_name: str = None) -> Any:
"""Export model and git add it."""
operations = []
try:
if model_type == "diffusers (soon)":
export_gen = export_diffusion_model(model_id, pipeline_name, task_or_pipeline, folder, token)
else:
if task_or_pipeline == "text-generation":
export_gen = export_decoder_model(model_id, folder, token)
else:
export_gen = export_transformer_model(model_id, task_or_pipeline, folder, token)
for message in export_gen:
yield message
# Create operations from exported files
for root, _, files in os.walk(folder):
for filename in files:
file_path = os.path.join(root, filename)
repo_path = os.path.relpath(file_path, folder)
operations.append(CommitOperationAdd(path_in_repo=repo_path, path_or_fileobj=file_path))
# Update model card
try:
card = ModelCard.load(model_id, token=token)
if not hasattr(card.data, "tags") or card.data.tags is None:
card.data.tags = []
if "neuron" not in card.data.tags:
card.data.tags.append("neuron")
readme_path = os.path.join(folder, "README.md")
card.save(readme_path)
readme_op = next((op for op in operations if op.path_in_repo == "README.md"), None)
if readme_op:
readme_op.path_or_fileobj = readme_path
else:
operations.append(CommitOperationAdd(path_in_repo="README.md", path_or_fileobj=readme_path))
except Exception as e:
yield f"⚠️ Warning: Could not update model card: {e}"
except Exception as e:
yield f"❌ Export failed with error: {e}"
raise
yield ("__RETURN__", operations)
def generate_neuron_repo_name(api, original_model_id: str, task_or_pipeline: str, token:str) -> str:
"""Generate a name for the Neuron-optimized repository."""
requesting_user = api.whoami(token=token)["name"]
base_name = original_model_id.replace('/', '-')
return f"{requesting_user}/{base_name}-neuron"
def create_neuron_repo_and_upload(
operations: List[CommitOperationAdd],
original_model_id: str,
model_type: str,
task_or_pipeline: str,
requesting_user: str,
token: str,
pipeline_name: str = None,
) -> Generator[Union[str, RepoUrl], None, None]:
"""Creates a new repository with Neuron files and uploads them."""
api = HfApi(token=token)
if task_or_pipeline == "auto" and model_type == "transformers":
try:
task_or_pipeline = TasksManager.infer_task_from_model(original_model_id, token=token)
except Exception as e:
raise Exception(f"❌ Could not infer task for model {original_model_id}: {e}")
# Generate repository name
neuron_repo_name = generate_neuron_repo_name(api, original_model_id, task_or_pipeline, token)
try:
repo_url = create_repo(
repo_id=neuron_repo_name,
token=token,
repo_type="model",
private=False,
exist_ok=True,
)
if model_type == "transformers":
model_class = TASK_TO_MODEL_CLASS.get(task_or_pipeline)
model_class_name = model_class.__name__ if model_class else "NeuronModel"
usage_example = f"""```python
from optimum.neuron import {model_class_name}
model = {model_class_name}.from_pretrained("{neuron_repo_name}")
```"""
else:
# For diffusion models
pipeline_config = DIFFUSION_PIPELINE_MAPPING.get(pipeline_name, {})
pipeline_class = pipeline_config.get("class")
if pipeline_class:
class_name = pipeline_class.__name__.replace("Pipeline", "")
model_class_name = f"Neuron{class_name}Pipeline"
else:
model_class_name = "NeuronStableDiffusionPipeline"
usage_example = f"""```python
from optimum.neuron import {model_class_name}
pipeline = {model_class_name}.from_pretrained("{neuron_repo_name}")
```"""
# Create enhanced model card for the Neuron repo
neuron_readme_content = f"""---
tags:
- neuron
- optimized
- aws-neuron
- {task_or_pipeline}
base_model: {original_model_id}
---
# Neuron-Optimized {original_model_id}
This repository contains AWS Neuron-optimized files for [{original_model_id}](https://huggingface.co/{original_model_id}).
## Model Details
- **Base Model**: [{original_model_id}](https://huggingface.co/{original_model_id})
- **Task**: {task_or_pipeline}
- **Optimization**: AWS Neuron compilation
- **Generated by**: [{requesting_user}](https://huggingface.co/{requesting_user})
- **Generated using**: [Optimum Neuron Compiler Space]({SPACES_URL})
## Usage
This model has been optimized for AWS Neuron devices (Inferentia/Trainium). To use it:
{usage_example}
## Performance
These files are pre-compiled for AWS Neuron devices and should provide improved inference performance compared to the original model when deployed on Inferentia or Trainium instances.
## Original Model
For the original model, training details, and more information, please visit: [{original_model_id}](https://huggingface.co/{original_model_id})
"""
# Update the README in operations
readme_op = next((op for op in operations if op.path_in_repo == "README.md"), None)
if readme_op:
with NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f:
f.write(neuron_readme_content)
readme_op.path_or_fileobj = f.name
else:
with NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f:
f.write(neuron_readme_content)
operations.append(CommitOperationAdd(path_in_repo="README.md", path_or_fileobj=f.name))
# Upload files to the new repository
commit_message = f"Add Neuron-optimized files for {original_model_id}"
commit_description = f"""
🤖 Neuron Export Bot: Adding AWS Neuron-optimized model files.
Original model: [{original_model_id}](https://huggingface.co/{original_model_id})
Task: {task_or_pipeline}
Generated by: [{requesting_user}](https://huggingface.co/{requesting_user})
Generated using: [Optimum Neuron Compiler Space]({SPACES_URL})
These files have been pre-compiled for AWS Neuron devices (Inferentia/Trainium) and should provide improved inference performance.
"""
commit_info = api.create_commit(
repo_id=neuron_repo_name,
operations=operations,
commit_message=commit_message,
commit_description=commit_description,
token=token,
)
yield f"✅ Repository created: {repo_url}"
except Exception as e:
yield f"❌ Failed to create/upload to Neuron repository: {e}"
raise
def create_readme_pr_for_original_model(
original_model_id: str,
neuron_repo_name: str,
task_or_pipeline: str,
requesting_user: str,
token: str,
) -> Generator[Union[str, CommitInfo], None, None]:
"""Creates a PR on the original model repository to add a link to the Neuron-optimized version."""
api = HfApi(token=token)
yield f"📝 Creating PR to add Neuron repo link in {original_model_id}..."
try:
# Check if there's already an open PR
pr_title = "Add link to Neuron-optimized version"
existing_pr = previous_pr(api, original_model_id, pr_title)
if existing_pr:
yield f"⚠️ PR already exists: https://huggingface.co/{original_model_id}/discussions/{existing_pr.num}"
return
# Get the current README
try:
current_readme_path = api.hf_hub_download(
repo_id=original_model_id,
filename="README.md",
token=token,
)
with open(current_readme_path, 'r', encoding='utf-8') as f:
readme_content = f.read()
except Exception:
readme_content = f"# {original_model_id}\n\n"
# Add Neuron optimization section, separated by a horizontal rule
neuron_section = f"""
---
## 🚀 AWS Neuron Optimized Version Available
A Neuron-optimized version of this model is available for improved performance on AWS Inferentia/Trainium instances:
**[{neuron_repo_name}](https://huggingface.co/{neuron_repo_name})**
The Neuron-optimized version provides:
- Pre-compiled artifacts for faster loading
- Optimized performance on AWS Neuron devices
- Same model capabilities with improved inference speed
"""
# Append the Neuron section to the end of the README
updated_readme = readme_content.rstrip() + "\n" + neuron_section
# Create temporary file with updated README
with NamedTemporaryFile(mode='w', suffix='.md', delete=False, encoding="utf-8") as f:
f.write(updated_readme)
temp_readme_path = f.name
# Create the PR
operations = [CommitOperationAdd(path_in_repo="README.md", path_or_fileobj=temp_readme_path)]
commit_description = f"""
🤖 Neuron Export Bot: Adding link to Neuron-optimized version.
A Neuron-optimized version of this model has been created at [{neuron_repo_name}](https://huggingface.co/{neuron_repo_name}).
The optimized version provides improved performance on AWS Inferentia/Trainium instances with pre-compiled artifacts.
Generated by: [{requesting_user}](https://huggingface.co/{requesting_user})
Generated using: [Optimum Neuron Compiler Space]({SPACES_URL})
"""
pr = api.create_commit(
repo_id=original_model_id,
operations=operations,
commit_message=pr_title,
commit_description=commit_description,
create_pr=True,
token=token,
)
yield f"✅ README PR created: https://huggingface.co/{original_model_id}/discussions/{pr.pr_num}"
# Clean up temporary file
os.unlink(temp_readme_path)
except Exception as e:
yield f"❌ Failed to create README PR: {e}"
raise
def upload_to_custom_repo(
operations: List[CommitOperationAdd],
custom_repo_id: str,
original_model_id: str,
requesting_user: str,
token: str,
) -> Generator[Union[str, CommitInfo], None, None]:
"""Uploads neuron files to a custom repository and creates a PR."""
api = HfApi(token=token)
try:
# Ensure the custom repo exists
api.repo_info(repo_id=custom_repo_id, repo_type="model")
except Exception as e:
yield f"❌ Could not access custom repository `{custom_repo_id}`. Please ensure it exists and you have write access. Error: {e}"
raise
pr_title = f"Add Neuron-optimized files for {original_model_id}"
commit_description = f"""
🤖 Neuron Export Bot: On behalf of [{requesting_user}](https://huggingface.co/{requesting_user}), adding AWS Neuron-optimized model files for `{original_model_id}`.
These files were generated using the [Optimum Neuron Compiler Space](https://huggingface.co/spaces/optimum/neuron-export).
"""
try:
custom_pr = api.create_commit(
repo_id=custom_repo_id,
operations=operations,
commit_message=pr_title,
commit_description=commit_description,
create_pr=True,
token=token,
)
yield f"✅ Custom PR created successfully: https://huggingface.co/{custom_repo_id}/discussions/{custom_pr.pr_num}"
yield custom_pr
except Exception as e:
yield f"❌ Failed to create PR in custom repository: {e}"
raise
def convert(
api: "HfApi",
model_id: str,
task_or_pipeline: str,
model_type: str = "transformers",
token: str = None,
pr_options: Dict = None,
pipeline_name: str = None,
) -> Generator[Tuple[str, Any], None, None]:
if pr_options is None:
pr_options = {}
info = api.model_info(model_id, token=token)
filenames = {s.rfilename for s in info.siblings}
requesting_user = api.whoami(token=token)["name"]
if not any(pr_options.values()):
yield "1", "⚠️ No option selected. Please choose at least one option."
return
if pr_options.get("create_custom_pr") and not pr_options.get("custom_repo_id"):
yield "1", "⚠️ Custom PR selected but no repository ID was provided."
return
yield "0", f"🚀 Starting export process with options: {pr_options}..."
if task_or_pipeline == "auto" and model_type == "transformers":
try:
task_or_pipeline = TasksManager.infer_task_from_model(model_id, token=token)
except Exception as e:
raise Exception(f"❌ Could not infer task for model {model_id}: {e}")
with TemporaryDirectory() as temp_dir:
export_folder = os.path.join(temp_dir, "export")
cache_mirror_dir = os.path.join(temp_dir, "cache_mirror")
os.makedirs(export_folder, exist_ok=True)
os.makedirs(cache_mirror_dir, exist_ok=True)
result_info = {}
try:
# Export Logic
export_gen = export_and_git_add(model_id, task_or_pipeline, model_type, export_folder, token=token, pipeline_name=pipeline_name)
operations = None
for message in export_gen:
if isinstance(message, tuple) and message[0] == "__RETURN__":
operations = message[1]
break
else:
yield "0", message
if not operations:
raise Exception("Export process did not produce any files to commit.")
# Cache Handling
if pr_options.get("create_cache_pr"):
yield "0", f"📤 Creating a Pull Request for the cache repository ..."
try:
pr_title = f"Add Neuron cache artifacts for {model_id}"
custom_pr_description = f"""
🤖 **Neuron Cache Sync Bot**
This PR adds newly compiled cache artifacts for the model:
- **Original Model ID:** `{model_id}`
- **Task:** `{task_or_pipeline}`
These files contain precompiled Neuron-optimized representations of the model, allowing faster loading and inference on AWS Neuron hardware.
"""
# Create an instance of your generator
commit_message = f"Synchronizing local compiler cache of {model_id}"
inputs = get_default_inputs(task_or_pipeline, pipeline_name)
commit_description = f"""
🤖 **Neuron Cache Sync Bot**
This commit adds newly compiled cache artifacts for the model:
- **Original Model ID:** `{model_id}`
- **Task:** `{task_or_pipeline}`
- **Compilation inputs:** {inputs}
- **Generated by:** [{requesting_user}](https://huggingface.co/{requesting_user})
- **Generated using:** [Optimum Neuron Model Exporter]({SPACES_URL})
These files contain precompiled Neuron-optimized representations of the model, allowing faster loading and inference on AWS Neuron hardware.
"""
pr_generator = synchronize_hub_cache_with_pr(
cache_repo_id=CUSTOM_CACHE_REPO,
commit_message=commit_message,
commit_description=commit_description,
token=token,
)
pr_url = None
# Loop to process yielded status messages and capture the final return value
while True:
try:
# Get the next status message from your generator
status_message = next(pr_generator)
yield "0", status_message
except StopIteration as e:
# The generator is finished. Its `return` value is in e.value.
pr_url = e.value
break # Exit the loop
# Process the final result
if pr_url:
yield "0", f"✅ Successfully captured PR URL."
result_info["cache_pr"] = pr_url
else:
yield "0", "⚠️ PR process finished, but no URL was returned. This may be expected in non-blocking mode."
except Exception as e:
yield "0", f"❌ Failed to create cache PR: {e}"
# New Repository Creation (Replaces Model PR)
if pr_options.get("create_neuron_repo"):
yield "0", "🏗️ Creating new Neuron-optimized repository..."
neuron_repo_url = None
neuron_repo_name = generate_neuron_repo_name(api, model_id, task_or_pipeline, token)
repo_creation_gen = create_neuron_repo_and_upload(
operations, model_id, model_type, task_or_pipeline, requesting_user, token, pipeline_name
)
for msg in repo_creation_gen:
if isinstance(msg, str):
yield "0", msg
else:
neuron_repo_url = msg
result_info["neuron_repo"] = f"https://huggingface.co/{neuron_repo_name}"
# Automatically create a PR on the original model to add a link
readme_pr = None
readme_pr_gen = create_readme_pr_for_original_model(
model_id, neuron_repo_name, task_or_pipeline, requesting_user, token
)
for msg in readme_pr_gen:
if isinstance(msg, str):
yield "0", msg
else:
readme_pr = msg
if readme_pr:
result_info["readme_pr"] = f"https://huggingface.co/{model_id}/discussions/{readme_pr.pr_num}"
# Custom Repository PR
if pr_options.get("create_custom_pr"):
custom_repo_id = pr_options["custom_repo_id"]
yield "0", f"📤 Creating PR in custom repository: {custom_repo_id}..."
custom_pr = None
custom_upload_gen = upload_to_custom_repo(operations, custom_repo_id, model_id, requesting_user, token)
for msg in custom_upload_gen:
if isinstance(msg, str):
yield "0", msg
else:
custom_pr = msg
if custom_pr:
result_info["custom_pr"] = f"https://huggingface.co/{custom_repo_id}/discussions/{custom_pr.pr_num}"
yield "0", result_info
except Exception as e:
yield "1", f"❌ Conversion failed with a critical error: {e}"
raise