|
|
""" |
|
|
WrinkleBrane Dataset Builder & HuggingFace Integration |
|
|
|
|
|
Creates curated datasets optimized for associative memory training with |
|
|
membrane storage, interference studies, and orthogonality benchmarks. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import json |
|
|
import gzip |
|
|
import random |
|
|
import math |
|
|
from typing import List, Dict, Any, Optional, Tuple, Union |
|
|
from pathlib import Path |
|
|
from datetime import datetime |
|
|
import tempfile |
|
|
|
|
|
import torch |
|
|
import numpy as np |
|
|
from datasets import Dataset, DatasetDict |
|
|
from huggingface_hub import HfApi, login, create_repo |
|
|
|
|
|
|
|
|
class WrinkleBraneDatasetBuilder: |
|
|
""" |
|
|
Comprehensive dataset builder for WrinkleBrane associative memory training. |
|
|
|
|
|
Generates: |
|
|
- Key-value pairs for associative memory tasks |
|
|
- Visual patterns (MNIST-style, geometric shapes) |
|
|
- Interference benchmark sequences |
|
|
- Orthogonality optimization data |
|
|
- Persistence decay studies |
|
|
""" |
|
|
|
|
|
def __init__(self, hf_token: str, repo_id: str = "WrinkleBrane"): |
|
|
"""Initialize with HuggingFace credentials.""" |
|
|
self.hf_token = hf_token |
|
|
self.repo_id = repo_id |
|
|
self.api = HfApi() |
|
|
|
|
|
|
|
|
login(token=hf_token) |
|
|
|
|
|
|
|
|
self.config = { |
|
|
"version": "1.0.0", |
|
|
"created": datetime.now().isoformat(), |
|
|
"model_compatibility": "WrinkleBrane", |
|
|
"membrane_encoding": "2D_spatial_maps", |
|
|
"default_H": 64, |
|
|
"default_W": 64, |
|
|
"default_L": 64, |
|
|
"default_K": 64, |
|
|
"total_samples": 20000, |
|
|
"quality_thresholds": { |
|
|
"min_fidelity_psnr": 20.0, |
|
|
"max_interference_rms": 0.1, |
|
|
"min_orthogonality": 0.8 |
|
|
} |
|
|
} |
|
|
|
|
|
def generate_visual_memory_pairs(self, num_samples: int = 5000, H: int = 64, W: int = 64) -> List[Dict]: |
|
|
"""Generate visual key-value pairs for associative memory.""" |
|
|
samples = [] |
|
|
|
|
|
visual_types = [ |
|
|
"mnist_digits", |
|
|
"geometric_shapes", |
|
|
"noise_patterns", |
|
|
"edge_features", |
|
|
"texture_patches", |
|
|
"sparse_dots" |
|
|
] |
|
|
|
|
|
for i in range(num_samples): |
|
|
visual_type = random.choice(visual_types) |
|
|
|
|
|
|
|
|
key_pattern = self._generate_visual_pattern(visual_type, H, W, is_key=True) |
|
|
|
|
|
|
|
|
value_pattern = self._generate_visual_pattern(visual_type, H, W, is_key=False) |
|
|
|
|
|
|
|
|
fidelity_psnr = self._compute_psnr(key_pattern, value_pattern) |
|
|
orthogonality = self._compute_orthogonality(key_pattern.flatten(), value_pattern.flatten()) |
|
|
compressibility = self._compute_gzip_ratio(key_pattern) |
|
|
|
|
|
sample = { |
|
|
"id": f"visual_{visual_type}_{i:06d}", |
|
|
"key_pattern": key_pattern.tolist(), |
|
|
"value_pattern": value_pattern.tolist(), |
|
|
"pattern_type": visual_type, |
|
|
"H": H, |
|
|
"W": W, |
|
|
"fidelity_psnr": float(fidelity_psnr), |
|
|
"orthogonality": float(orthogonality), |
|
|
"compressibility": float(compressibility), |
|
|
"category": "visual_memory", |
|
|
|
|
|
"interference_rms": None, |
|
|
"persistence_lambda": None, |
|
|
"codebook_type": None, |
|
|
"capacity_load": None, |
|
|
"time_step": None, |
|
|
"energy_retention": None, |
|
|
"temporal_correlation": None, |
|
|
"L": None, |
|
|
"K": None, |
|
|
"reconstruction_error": None, |
|
|
"reconstructed_pattern": None, |
|
|
"codebook_matrix": None |
|
|
} |
|
|
samples.append(sample) |
|
|
|
|
|
return samples |
|
|
|
|
|
def generate_synthetic_maps(self, num_samples: int = 3000, H: int = 64, W: int = 64) -> List[Dict]: |
|
|
"""Generate synthetic spatial pattern mappings.""" |
|
|
samples = [] |
|
|
|
|
|
map_types = [ |
|
|
"gaussian_fields", |
|
|
"spiral_patterns", |
|
|
"frequency_domains", |
|
|
"cellular_automata", |
|
|
"fractal_structures", |
|
|
"gradient_maps" |
|
|
] |
|
|
|
|
|
for i in range(num_samples): |
|
|
map_type = random.choice(map_types) |
|
|
|
|
|
|
|
|
key_map = self._generate_synthetic_map(map_type, H, W, seed=i*2) |
|
|
value_map = self._generate_synthetic_map(map_type, H, W, seed=i*2+1) |
|
|
|
|
|
|
|
|
value_map = self._apply_map_transform(key_map, value_map, map_type) |
|
|
|
|
|
|
|
|
fidelity_psnr = self._compute_psnr(key_map, value_map) |
|
|
orthogonality = self._compute_orthogonality(key_map.flatten(), value_map.flatten()) |
|
|
compressibility = self._compute_gzip_ratio(key_map) |
|
|
|
|
|
sample = { |
|
|
"id": f"synthetic_{map_type}_{i:06d}", |
|
|
"key_pattern": key_map.tolist(), |
|
|
"value_pattern": value_map.tolist(), |
|
|
"pattern_type": map_type, |
|
|
"H": H, |
|
|
"W": W, |
|
|
"fidelity_psnr": float(fidelity_psnr), |
|
|
"orthogonality": float(orthogonality), |
|
|
"compressibility": float(compressibility), |
|
|
"category": "synthetic_maps", |
|
|
|
|
|
"interference_rms": None, |
|
|
"persistence_lambda": None, |
|
|
"codebook_type": None, |
|
|
"capacity_load": None, |
|
|
"time_step": None, |
|
|
"energy_retention": None, |
|
|
"temporal_correlation": None, |
|
|
"L": None, |
|
|
"K": None, |
|
|
"reconstruction_error": None, |
|
|
"reconstructed_pattern": None, |
|
|
"codebook_matrix": None |
|
|
} |
|
|
samples.append(sample) |
|
|
|
|
|
return samples |
|
|
|
|
|
def generate_interference_studies(self, num_samples: int = 2000, H: int = 64, W: int = 64) -> List[Dict]: |
|
|
"""Generate data for studying memory interference and capacity limits.""" |
|
|
samples = [] |
|
|
|
|
|
|
|
|
capacity_loads = [0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99] |
|
|
|
|
|
for load in capacity_loads: |
|
|
load_samples = int(num_samples * 0.14) |
|
|
|
|
|
for i in range(load_samples): |
|
|
|
|
|
num_patterns = max(1, int(64 * load)) |
|
|
|
|
|
patterns = [] |
|
|
for p in range(min(num_patterns, 10)): |
|
|
pattern = np.random.randn(H, W).astype(np.float32) |
|
|
pattern = (pattern - pattern.mean()) / pattern.std() |
|
|
patterns.append(pattern) |
|
|
|
|
|
|
|
|
composite = np.sum(patterns, axis=0) / len(patterns) |
|
|
target = patterns[0] if patterns else composite |
|
|
|
|
|
|
|
|
interference_rms = self._compute_interference_rms(patterns, target) |
|
|
fidelity_psnr = self._compute_psnr(composite, target) |
|
|
orthogonality = self._compute_pattern_orthogonality(patterns) |
|
|
|
|
|
sample = { |
|
|
"id": f"interference_load_{load}_{i:06d}", |
|
|
"key_pattern": composite.tolist(), |
|
|
"value_pattern": target.tolist(), |
|
|
"pattern_type": "interference_test", |
|
|
"H": H, |
|
|
"W": W, |
|
|
"capacity_load": float(load), |
|
|
"interference_rms": float(interference_rms), |
|
|
"fidelity_psnr": float(fidelity_psnr), |
|
|
"orthogonality": float(orthogonality), |
|
|
"category": "interference_study", |
|
|
|
|
|
"compressibility": None, |
|
|
"persistence_lambda": None, |
|
|
"codebook_type": None, |
|
|
"time_step": None, |
|
|
"energy_retention": None, |
|
|
"temporal_correlation": None, |
|
|
"L": None, |
|
|
"K": None, |
|
|
"reconstruction_error": None, |
|
|
"reconstructed_pattern": None, |
|
|
"codebook_matrix": None |
|
|
} |
|
|
samples.append(sample) |
|
|
|
|
|
return samples |
|
|
|
|
|
def generate_orthogonality_benchmarks(self, num_samples: int = 1500, L: int = 64, K: int = 64) -> List[Dict]: |
|
|
"""Generate codebook optimization data for orthogonality studies.""" |
|
|
samples = [] |
|
|
|
|
|
codebook_types = [ |
|
|
"hadamard", |
|
|
"random_orthogonal", |
|
|
"dct_basis", |
|
|
"wavelet_basis", |
|
|
"learned_sparse" |
|
|
] |
|
|
|
|
|
for codebook_type in codebook_types: |
|
|
type_samples = num_samples // len(codebook_types) |
|
|
|
|
|
for i in range(type_samples): |
|
|
|
|
|
codebook = self._generate_codebook(codebook_type, L, K, seed=i) |
|
|
|
|
|
|
|
|
H, W = 64, 64 |
|
|
test_key = np.random.randn(H, W).astype(np.float32) |
|
|
test_value = np.random.randn(H, W).astype(np.float32) |
|
|
|
|
|
|
|
|
written_membrane, read_result = self._simulate_membrane_operation( |
|
|
codebook, test_key, test_value, H, W |
|
|
) |
|
|
|
|
|
|
|
|
orthogonality = self._compute_codebook_orthogonality(codebook) |
|
|
reconstruction_error = np.mean((test_value - read_result) ** 2) |
|
|
|
|
|
sample = { |
|
|
"id": f"orthogonal_{codebook_type}_{i:06d}", |
|
|
"key_pattern": test_key.tolist(), |
|
|
"value_pattern": test_value.tolist(), |
|
|
"reconstructed_pattern": read_result.tolist(), |
|
|
"codebook_matrix": codebook.tolist(), |
|
|
"pattern_type": "orthogonality_test", |
|
|
"codebook_type": codebook_type, |
|
|
"H": H, |
|
|
"W": W, |
|
|
"L": L, |
|
|
"K": K, |
|
|
"orthogonality": float(orthogonality), |
|
|
"reconstruction_error": float(reconstruction_error), |
|
|
"category": "orthogonality_benchmark", |
|
|
|
|
|
"fidelity_psnr": None, |
|
|
"compressibility": None, |
|
|
"interference_rms": None, |
|
|
"persistence_lambda": None, |
|
|
"capacity_load": None, |
|
|
"time_step": None, |
|
|
"energy_retention": None, |
|
|
"temporal_correlation": None |
|
|
} |
|
|
samples.append(sample) |
|
|
|
|
|
return samples |
|
|
|
|
|
def generate_persistence_traces(self, num_samples: int = 1000, H: int = 64, W: int = 64) -> List[Dict]: |
|
|
"""Generate temporal decay studies for persistence analysis.""" |
|
|
samples = [] |
|
|
|
|
|
|
|
|
lambda_values = [0.95, 0.97, 0.98, 0.99, 0.995] |
|
|
time_steps = [1, 5, 10, 20, 50, 100] |
|
|
|
|
|
for lambda_val in lambda_values: |
|
|
for time_step in time_steps: |
|
|
step_samples = max(1, num_samples // (len(lambda_values) * len(time_steps))) |
|
|
|
|
|
for i in range(step_samples): |
|
|
|
|
|
initial_pattern = np.random.randn(H, W).astype(np.float32) |
|
|
initial_pattern = (initial_pattern - initial_pattern.mean()) / initial_pattern.std() |
|
|
|
|
|
|
|
|
decayed_pattern = initial_pattern * (lambda_val ** time_step) |
|
|
|
|
|
|
|
|
noise_level = 0.01 * (1 - lambda_val) |
|
|
noise = np.random.normal(0, noise_level, (H, W)).astype(np.float32) |
|
|
decayed_pattern += noise |
|
|
|
|
|
|
|
|
energy_retention = np.mean(decayed_pattern ** 2) / np.mean(initial_pattern ** 2) |
|
|
correlation = np.corrcoef(initial_pattern.flatten(), decayed_pattern.flatten())[0, 1] |
|
|
|
|
|
sample = { |
|
|
"id": f"persistence_l{lambda_val}_t{time_step}_{i:06d}", |
|
|
"key_pattern": initial_pattern.tolist(), |
|
|
"value_pattern": decayed_pattern.tolist(), |
|
|
"pattern_type": "persistence_decay", |
|
|
"persistence_lambda": float(lambda_val), |
|
|
"time_step": int(time_step), |
|
|
"H": H, |
|
|
"W": W, |
|
|
"energy_retention": float(energy_retention), |
|
|
"temporal_correlation": float(correlation if not np.isnan(correlation) else 0.0), |
|
|
"category": "persistence_trace", |
|
|
|
|
|
"fidelity_psnr": None, |
|
|
"orthogonality": None, |
|
|
"compressibility": None, |
|
|
"interference_rms": None, |
|
|
"codebook_type": None, |
|
|
"capacity_load": None, |
|
|
|
|
|
"L": None, |
|
|
"K": None, |
|
|
"reconstruction_error": None, |
|
|
"reconstructed_pattern": None, |
|
|
"codebook_matrix": None |
|
|
} |
|
|
samples.append(sample) |
|
|
|
|
|
return samples |
|
|
|
|
|
def _generate_visual_pattern(self, pattern_type: str, H: int, W: int, is_key: bool = True) -> np.ndarray: |
|
|
"""Generate visual patterns for different types.""" |
|
|
if pattern_type == "mnist_digits": |
|
|
|
|
|
digit = random.randint(0, 9) |
|
|
pattern = self._create_digit_pattern(digit, H, W) |
|
|
if not is_key: |
|
|
|
|
|
pattern = self._apply_simple_transform(pattern, "rotate_small") |
|
|
|
|
|
elif pattern_type == "geometric_shapes": |
|
|
shape = random.choice(["circle", "square", "triangle", "cross"]) |
|
|
pattern = self._create_geometric_pattern(shape, H, W) |
|
|
if not is_key: |
|
|
pattern = self._apply_simple_transform(pattern, "scale") |
|
|
|
|
|
elif pattern_type == "noise_patterns": |
|
|
pattern = np.random.randn(H, W).astype(np.float32) |
|
|
pattern = (pattern - pattern.mean()) / pattern.std() |
|
|
if not is_key: |
|
|
pattern = pattern + 0.1 * np.random.randn(H, W) |
|
|
|
|
|
else: |
|
|
|
|
|
pattern = np.random.uniform(-1, 1, (H, W)).astype(np.float32) |
|
|
|
|
|
return pattern |
|
|
|
|
|
def _generate_synthetic_map(self, map_type: str, H: int, W: int, seed: int) -> np.ndarray: |
|
|
"""Generate synthetic spatial maps.""" |
|
|
np.random.seed(seed) |
|
|
|
|
|
if map_type == "gaussian_fields": |
|
|
|
|
|
x, y = np.meshgrid(np.linspace(-2, 2, W), np.linspace(-2, 2, H)) |
|
|
pattern = np.exp(-(x**2 + y**2) / (2 * (0.5 + random.random())**2)) |
|
|
|
|
|
elif map_type == "spiral_patterns": |
|
|
|
|
|
x, y = np.meshgrid(np.linspace(-np.pi, np.pi, W), np.linspace(-np.pi, np.pi, H)) |
|
|
r = np.sqrt(x**2 + y**2) |
|
|
theta = np.arctan2(y, x) |
|
|
pattern = np.sin(r * 3 + theta * random.randint(1, 5)) |
|
|
|
|
|
elif map_type == "frequency_domains": |
|
|
|
|
|
freq_x, freq_y = random.randint(1, 8), random.randint(1, 8) |
|
|
x, y = np.meshgrid(np.linspace(0, 2*np.pi, W), np.linspace(0, 2*np.pi, H)) |
|
|
pattern = np.sin(freq_x * x) * np.cos(freq_y * y) |
|
|
|
|
|
else: |
|
|
|
|
|
pattern = np.random.randn(H, W) |
|
|
|
|
|
|
|
|
pattern = (pattern - pattern.mean()) / (pattern.std() + 1e-7) |
|
|
return pattern.astype(np.float32) |
|
|
|
|
|
def _create_digit_pattern(self, digit: int, H: int, W: int) -> np.ndarray: |
|
|
"""Create simple digit-like pattern.""" |
|
|
pattern = np.zeros((H, W), dtype=np.float32) |
|
|
|
|
|
|
|
|
h_center, w_center = H // 2, W // 2 |
|
|
size = min(H, W) // 3 |
|
|
|
|
|
if digit in [0, 6, 8, 9]: |
|
|
|
|
|
y, x = np.ogrid[:H, :W] |
|
|
mask = ((x - w_center) ** 2 / size**2 + (y - h_center) ** 2 / size**2) <= 1 |
|
|
pattern[mask] = 1.0 |
|
|
|
|
|
if digit in [1, 4, 7]: |
|
|
|
|
|
pattern[h_center-size:h_center+size, w_center-2:w_center+2] = 1.0 |
|
|
|
|
|
|
|
|
noise = 0.1 * np.random.randn(H, W) |
|
|
pattern = np.clip(pattern + noise, -1, 1) |
|
|
|
|
|
return pattern |
|
|
|
|
|
def _create_geometric_pattern(self, shape: str, H: int, W: int) -> np.ndarray: |
|
|
"""Create geometric shape patterns.""" |
|
|
pattern = np.zeros((H, W), dtype=np.float32) |
|
|
center_h, center_w = H // 2, W // 2 |
|
|
size = min(H, W) // 4 |
|
|
|
|
|
if shape == "circle": |
|
|
y, x = np.ogrid[:H, :W] |
|
|
mask = ((x - center_w) ** 2 + (y - center_h) ** 2) <= size**2 |
|
|
pattern[mask] = 1.0 |
|
|
|
|
|
elif shape == "square": |
|
|
pattern[center_h-size:center_h+size, center_w-size:center_w+size] = 1.0 |
|
|
|
|
|
elif shape == "cross": |
|
|
pattern[center_h-size:center_h+size, center_w-3:center_w+3] = 1.0 |
|
|
pattern[center_h-3:center_h+3, center_w-size:center_w+size] = 1.0 |
|
|
|
|
|
return pattern |
|
|
|
|
|
def _apply_simple_transform(self, pattern: np.ndarray, transform: str) -> np.ndarray: |
|
|
"""Apply simple transformations to patterns.""" |
|
|
if transform == "rotate_small": |
|
|
|
|
|
return np.roll(pattern, random.randint(-2, 2), axis=random.randint(0, 1)) |
|
|
elif transform == "scale": |
|
|
|
|
|
return pattern * (0.8 + 0.4 * random.random()) |
|
|
else: |
|
|
return pattern |
|
|
|
|
|
def _apply_map_transform(self, key_map: np.ndarray, value_map: np.ndarray, map_type: str) -> np.ndarray: |
|
|
"""Apply transformation relationship between key and value maps.""" |
|
|
if map_type == "gaussian_fields": |
|
|
|
|
|
return 0.7 * key_map + 0.3 * value_map |
|
|
elif map_type == "spiral_patterns": |
|
|
|
|
|
return np.roll(key_map, random.randint(-3, 3), axis=1) |
|
|
else: |
|
|
|
|
|
return 0.8 * key_map + 0.2 * value_map |
|
|
|
|
|
def _compute_psnr(self, pattern1: np.ndarray, pattern2: np.ndarray) -> float: |
|
|
"""Compute Peak Signal-to-Noise Ratio.""" |
|
|
mse = np.mean((pattern1 - pattern2) ** 2) |
|
|
if mse == 0: |
|
|
return float('inf') |
|
|
max_val = max(np.max(pattern1), np.max(pattern2)) |
|
|
psnr = 20 * np.log10(max_val / np.sqrt(mse)) |
|
|
return psnr |
|
|
|
|
|
def _compute_orthogonality(self, vec1: np.ndarray, vec2: np.ndarray) -> float: |
|
|
"""Compute orthogonality score between two vectors.""" |
|
|
vec1_norm = vec1 / (np.linalg.norm(vec1) + 1e-7) |
|
|
vec2_norm = vec2 / (np.linalg.norm(vec2) + 1e-7) |
|
|
dot_product = np.abs(np.dot(vec1_norm, vec2_norm)) |
|
|
orthogonality = 1.0 - dot_product |
|
|
return orthogonality |
|
|
|
|
|
def _compute_gzip_ratio(self, pattern: np.ndarray) -> float: |
|
|
"""Compute compressibility using gzip ratio.""" |
|
|
|
|
|
pattern_bytes = (pattern * 255).astype(np.uint8).tobytes() |
|
|
compressed = gzip.compress(pattern_bytes) |
|
|
ratio = len(compressed) / len(pattern_bytes) |
|
|
return ratio |
|
|
|
|
|
def _compute_interference_rms(self, patterns: List[np.ndarray], target: np.ndarray) -> float: |
|
|
"""Compute RMS interference from multiple patterns.""" |
|
|
if not patterns: |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
interference = np.zeros_like(target) |
|
|
for p in patterns[1:]: |
|
|
interference += p |
|
|
|
|
|
rms = np.sqrt(np.mean(interference ** 2)) |
|
|
return rms |
|
|
|
|
|
def _compute_pattern_orthogonality(self, patterns: List[np.ndarray]) -> float: |
|
|
"""Compute average orthogonality between patterns.""" |
|
|
if len(patterns) < 2: |
|
|
return 1.0 |
|
|
|
|
|
orthogonalities = [] |
|
|
for i in range(len(patterns)): |
|
|
for j in range(i + 1, min(i + 5, len(patterns))): |
|
|
orth = self._compute_orthogonality(patterns[i].flatten(), patterns[j].flatten()) |
|
|
orthogonalities.append(orth) |
|
|
|
|
|
return np.mean(orthogonalities) if orthogonalities else 1.0 |
|
|
|
|
|
def _generate_codebook(self, codebook_type: str, L: int, K: int, seed: int) -> np.ndarray: |
|
|
"""Generate codebook matrix for different types.""" |
|
|
np.random.seed(seed) |
|
|
|
|
|
if codebook_type == "hadamard" and L <= 64 and K <= 64: |
|
|
|
|
|
codebook = np.random.choice([-1, 1], size=(L, K)) |
|
|
|
|
|
elif codebook_type == "random_orthogonal": |
|
|
|
|
|
random_matrix = np.random.randn(L, K) |
|
|
if L >= K: |
|
|
q, _ = np.linalg.qr(random_matrix) |
|
|
codebook = q[:, :K] |
|
|
else: |
|
|
codebook = random_matrix |
|
|
|
|
|
else: |
|
|
|
|
|
codebook = np.random.randn(L, K) / np.sqrt(L) |
|
|
|
|
|
return codebook.astype(np.float32) |
|
|
|
|
|
def _simulate_membrane_operation(self, codebook: np.ndarray, key: np.ndarray, |
|
|
value: np.ndarray, H: int, W: int) -> Tuple[np.ndarray, np.ndarray]: |
|
|
"""Simulate membrane write and read operation.""" |
|
|
L, K = codebook.shape |
|
|
|
|
|
|
|
|
|
|
|
alpha = 1.0 |
|
|
membrane = np.zeros((L, H, W)) |
|
|
|
|
|
|
|
|
for l in range(min(L, 16)): |
|
|
membrane[l] = codebook[l, 0] * value |
|
|
|
|
|
|
|
|
|
|
|
read_result = np.zeros((H, W)) |
|
|
for l in range(min(L, 16)): |
|
|
read_result += codebook[l, 0] * membrane[l] |
|
|
|
|
|
|
|
|
read_result = np.maximum(0, read_result) |
|
|
|
|
|
return membrane, read_result.astype(np.float32) |
|
|
|
|
|
def _compute_codebook_orthogonality(self, codebook: np.ndarray) -> float: |
|
|
"""Compute orthogonality measure of codebook.""" |
|
|
|
|
|
gram = codebook.T @ codebook |
|
|
|
|
|
|
|
|
identity = np.eye(gram.shape[0]) |
|
|
frobenius_dist = np.linalg.norm(gram - identity, 'fro') |
|
|
|
|
|
|
|
|
orthogonality = 1.0 / (1.0 + frobenius_dist / gram.shape[0]) |
|
|
return orthogonality |
|
|
|
|
|
def build_complete_dataset(self) -> DatasetDict: |
|
|
"""Build the complete WrinkleBrane dataset.""" |
|
|
print("π§ Building WrinkleBrane Dataset...") |
|
|
|
|
|
all_samples = [] |
|
|
|
|
|
|
|
|
print("ποΈ Generating visual memory pairs...") |
|
|
visual_samples = self.generate_visual_memory_pairs(8000) |
|
|
all_samples.extend(visual_samples) |
|
|
|
|
|
|
|
|
print("πΊοΈ Generating synthetic maps...") |
|
|
map_samples = self.generate_synthetic_maps(5000) |
|
|
all_samples.extend(map_samples) |
|
|
|
|
|
|
|
|
print("β‘ Generating interference studies...") |
|
|
interference_samples = self.generate_interference_studies(4000) |
|
|
all_samples.extend(interference_samples) |
|
|
|
|
|
|
|
|
print("π Generating orthogonality benchmarks...") |
|
|
orthogonal_samples = self.generate_orthogonality_benchmarks(2000) |
|
|
all_samples.extend(orthogonal_samples) |
|
|
|
|
|
|
|
|
print("β° Generating persistence traces...") |
|
|
persistence_samples = self.generate_persistence_traces(1000) |
|
|
all_samples.extend(persistence_samples) |
|
|
|
|
|
|
|
|
random.shuffle(all_samples) |
|
|
|
|
|
total = len(all_samples) |
|
|
train_split = int(0.8 * total) |
|
|
val_split = int(0.9 * total) |
|
|
|
|
|
train_data = all_samples[:train_split] |
|
|
val_data = all_samples[train_split:val_split] |
|
|
test_data = all_samples[val_split:] |
|
|
|
|
|
|
|
|
dataset_dict = DatasetDict({ |
|
|
'train': Dataset.from_list(train_data), |
|
|
'validation': Dataset.from_list(val_data), |
|
|
'test': Dataset.from_list(test_data) |
|
|
}) |
|
|
|
|
|
print(f"β
Dataset built: {len(train_data)} train, {len(val_data)} val, {len(test_data)} test") |
|
|
return dataset_dict |
|
|
|
|
|
def upload_to_huggingface(self, dataset: DatasetDict, private: bool = True) -> str: |
|
|
"""Upload dataset to HuggingFace Hub.""" |
|
|
print(f"π Uploading to HuggingFace: {self.repo_id}") |
|
|
|
|
|
try: |
|
|
|
|
|
create_repo( |
|
|
repo_id=self.repo_id, |
|
|
repo_type="dataset", |
|
|
private=private, |
|
|
exist_ok=True, |
|
|
token=self.hf_token |
|
|
) |
|
|
|
|
|
|
|
|
dataset_info = { |
|
|
"dataset_info": self.config, |
|
|
"splits": { |
|
|
"train": len(dataset["train"]), |
|
|
"validation": len(dataset["validation"]), |
|
|
"test": len(dataset["test"]) |
|
|
}, |
|
|
"features": { |
|
|
"id": "string", |
|
|
"key_pattern": "2D array of floats (H x W)", |
|
|
"value_pattern": "2D array of floats (H x W)", |
|
|
"pattern_type": "string", |
|
|
"H": "integer (height)", |
|
|
"W": "integer (width)", |
|
|
"category": "string", |
|
|
"optional_metrics": "various floats for specific sample types" |
|
|
}, |
|
|
"usage_notes": [ |
|
|
"Optimized for WrinkleBrane associative memory training", |
|
|
"Key-value pairs for membrane storage and retrieval", |
|
|
"Includes interference studies and capacity analysis", |
|
|
"Supports orthogonality optimization research" |
|
|
] |
|
|
} |
|
|
|
|
|
|
|
|
dataset.push_to_hub( |
|
|
repo_id=self.repo_id, |
|
|
token=self.hf_token, |
|
|
private=private |
|
|
) |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: |
|
|
json.dump(dataset_info, f, indent=2) |
|
|
self.api.upload_file( |
|
|
path_or_fileobj=f.name, |
|
|
path_in_repo="dataset_info.json", |
|
|
repo_id=self.repo_id, |
|
|
repo_type="dataset", |
|
|
token=self.hf_token |
|
|
) |
|
|
|
|
|
print(f"β
Dataset uploaded successfully to: https://huggingface.co/datasets/{self.repo_id}") |
|
|
return f"https://huggingface.co/datasets/{self.repo_id}" |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Upload failed: {e}") |
|
|
raise |
|
|
|
|
|
|
|
|
def create_wrinklebrane_dataset(hf_token: str, repo_id: str = "WrinkleBrane") -> str: |
|
|
""" |
|
|
Convenience function to create and upload WrinkleBrane dataset. |
|
|
|
|
|
Args: |
|
|
hf_token: HuggingFace access token |
|
|
repo_id: Dataset repository ID |
|
|
|
|
|
Returns: |
|
|
URL to the uploaded dataset |
|
|
""" |
|
|
builder = WrinkleBraneDatasetBuilder(hf_token, repo_id) |
|
|
dataset = builder.build_complete_dataset() |
|
|
return builder.upload_to_huggingface(dataset, private=True) |