""" WrinkleBrane Dataset Builder & HuggingFace Integration Creates curated datasets optimized for associative memory training with membrane storage, interference studies, and orthogonality benchmarks. """ import os import json import gzip import random import math from typing import List, Dict, Any, Optional, Tuple, Union from pathlib import Path from datetime import datetime import tempfile import torch import numpy as np from datasets import Dataset, DatasetDict from huggingface_hub import HfApi, login, create_repo class WrinkleBraneDatasetBuilder: """ Comprehensive dataset builder for WrinkleBrane associative memory training. Generates: - Key-value pairs for associative memory tasks - Visual patterns (MNIST-style, geometric shapes) - Interference benchmark sequences - Orthogonality optimization data - Persistence decay studies """ def __init__(self, hf_token: str, repo_id: str = "WrinkleBrane"): """Initialize with HuggingFace credentials.""" self.hf_token = hf_token self.repo_id = repo_id self.api = HfApi() # Login to HuggingFace login(token=hf_token) # Dataset configuration self.config = { "version": "1.0.0", "created": datetime.now().isoformat(), "model_compatibility": "WrinkleBrane", "membrane_encoding": "2D_spatial_maps", "default_H": 64, "default_W": 64, "default_L": 64, # membrane layers "default_K": 64, # codebook size "total_samples": 20000, "quality_thresholds": { "min_fidelity_psnr": 20.0, "max_interference_rms": 0.1, "min_orthogonality": 0.8 } } def generate_visual_memory_pairs(self, num_samples: int = 5000, H: int = 64, W: int = 64) -> List[Dict]: """Generate visual key-value pairs for associative memory.""" samples = [] visual_types = [ "mnist_digits", "geometric_shapes", "noise_patterns", "edge_features", "texture_patches", "sparse_dots" ] for i in range(num_samples): visual_type = random.choice(visual_types) # Generate key pattern key_pattern = self._generate_visual_pattern(visual_type, H, W, is_key=True) # Generate corresponding value pattern value_pattern = self._generate_visual_pattern(visual_type, H, W, is_key=False) # Compute quality metrics fidelity_psnr = self._compute_psnr(key_pattern, value_pattern) orthogonality = self._compute_orthogonality(key_pattern.flatten(), value_pattern.flatten()) compressibility = self._compute_gzip_ratio(key_pattern) sample = { "id": f"visual_{visual_type}_{i:06d}", "key_pattern": key_pattern.tolist(), "value_pattern": value_pattern.tolist(), "pattern_type": visual_type, "H": H, "W": W, "fidelity_psnr": float(fidelity_psnr), "orthogonality": float(orthogonality), "compressibility": float(compressibility), "category": "visual_memory", # Consistent schema fields "interference_rms": None, "persistence_lambda": None, "codebook_type": None, "capacity_load": None, "time_step": None, "energy_retention": None, "temporal_correlation": None, "L": None, "K": None, "reconstruction_error": None, "reconstructed_pattern": None, "codebook_matrix": None } samples.append(sample) return samples def generate_synthetic_maps(self, num_samples: int = 3000, H: int = 64, W: int = 64) -> List[Dict]: """Generate synthetic spatial pattern mappings.""" samples = [] map_types = [ "gaussian_fields", "spiral_patterns", "frequency_domains", "cellular_automata", "fractal_structures", "gradient_maps" ] for i in range(num_samples): map_type = random.choice(map_types) # Generate synthetic key-value mapping key_map = self._generate_synthetic_map(map_type, H, W, seed=i*2) value_map = self._generate_synthetic_map(map_type, H, W, seed=i*2+1) # Apply transformation relationship value_map = self._apply_map_transform(key_map, value_map, map_type) # Compute metrics fidelity_psnr = self._compute_psnr(key_map, value_map) orthogonality = self._compute_orthogonality(key_map.flatten(), value_map.flatten()) compressibility = self._compute_gzip_ratio(key_map) sample = { "id": f"synthetic_{map_type}_{i:06d}", "key_pattern": key_map.tolist(), "value_pattern": value_map.tolist(), "pattern_type": map_type, "H": H, "W": W, "fidelity_psnr": float(fidelity_psnr), "orthogonality": float(orthogonality), "compressibility": float(compressibility), "category": "synthetic_maps", # Consistent schema fields "interference_rms": None, "persistence_lambda": None, "codebook_type": None, "capacity_load": None, "time_step": None, "energy_retention": None, "temporal_correlation": None, "L": None, "K": None, "reconstruction_error": None, "reconstructed_pattern": None, "codebook_matrix": None } samples.append(sample) return samples def generate_interference_studies(self, num_samples: int = 2000, H: int = 64, W: int = 64) -> List[Dict]: """Generate data for studying memory interference and capacity limits.""" samples = [] # Test different capacity loads capacity_loads = [0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99] for load in capacity_loads: load_samples = int(num_samples * 0.14) # Distribute across loads for i in range(load_samples): # Generate multiple overlapping patterns to study interference num_patterns = max(1, int(64 * load)) # Scale with capacity load patterns = [] for p in range(min(num_patterns, 10)): # Limit for memory pattern = np.random.randn(H, W).astype(np.float32) pattern = (pattern - pattern.mean()) / pattern.std() # Normalize patterns.append(pattern) # Create composite pattern (sum of all patterns) composite = np.sum(patterns, axis=0) / len(patterns) target = patterns[0] if patterns else composite # Try to retrieve first pattern # Compute interference metrics interference_rms = self._compute_interference_rms(patterns, target) fidelity_psnr = self._compute_psnr(composite, target) orthogonality = self._compute_pattern_orthogonality(patterns) sample = { "id": f"interference_load_{load}_{i:06d}", "key_pattern": composite.tolist(), "value_pattern": target.tolist(), "pattern_type": "interference_test", "H": H, "W": W, "capacity_load": float(load), "interference_rms": float(interference_rms), "fidelity_psnr": float(fidelity_psnr), "orthogonality": float(orthogonality), "category": "interference_study", # Consistent schema fields "compressibility": None, "persistence_lambda": None, "codebook_type": None, "time_step": None, "energy_retention": None, "temporal_correlation": None, "L": None, "K": None, "reconstruction_error": None, "reconstructed_pattern": None, "codebook_matrix": None } samples.append(sample) return samples def generate_orthogonality_benchmarks(self, num_samples: int = 1500, L: int = 64, K: int = 64) -> List[Dict]: """Generate codebook optimization data for orthogonality studies.""" samples = [] codebook_types = [ "hadamard", "random_orthogonal", "dct_basis", "wavelet_basis", "learned_sparse" ] for codebook_type in codebook_types: type_samples = num_samples // len(codebook_types) for i in range(type_samples): # Generate codebook matrix C[L, K] codebook = self._generate_codebook(codebook_type, L, K, seed=i) # Test multiple read/write operations H, W = 64, 64 test_key = np.random.randn(H, W).astype(np.float32) test_value = np.random.randn(H, W).astype(np.float32) # Simulate membrane write and read written_membrane, read_result = self._simulate_membrane_operation( codebook, test_key, test_value, H, W ) # Compute orthogonality metrics orthogonality = self._compute_codebook_orthogonality(codebook) reconstruction_error = np.mean((test_value - read_result) ** 2) sample = { "id": f"orthogonal_{codebook_type}_{i:06d}", "key_pattern": test_key.tolist(), "value_pattern": test_value.tolist(), "reconstructed_pattern": read_result.tolist(), "codebook_matrix": codebook.tolist(), "pattern_type": "orthogonality_test", "codebook_type": codebook_type, "H": H, "W": W, "L": L, "K": K, "orthogonality": float(orthogonality), "reconstruction_error": float(reconstruction_error), "category": "orthogonality_benchmark", # Consistent schema fields "fidelity_psnr": None, "compressibility": None, "interference_rms": None, "persistence_lambda": None, "capacity_load": None, "time_step": None, "energy_retention": None, "temporal_correlation": None } samples.append(sample) return samples def generate_persistence_traces(self, num_samples: int = 1000, H: int = 64, W: int = 64) -> List[Dict]: """Generate temporal decay studies for persistence analysis.""" samples = [] # Test different decay rates lambda_values = [0.95, 0.97, 0.98, 0.99, 0.995] time_steps = [1, 5, 10, 20, 50, 100] for lambda_val in lambda_values: for time_step in time_steps: step_samples = max(1, num_samples // (len(lambda_values) * len(time_steps))) for i in range(step_samples): # Generate initial pattern initial_pattern = np.random.randn(H, W).astype(np.float32) initial_pattern = (initial_pattern - initial_pattern.mean()) / initial_pattern.std() # Simulate temporal decay: M_t+1 = Ξ» * M_t decayed_pattern = initial_pattern * (lambda_val ** time_step) # Add noise for realism noise_level = 0.01 * (1 - lambda_val) # More noise for faster decay noise = np.random.normal(0, noise_level, (H, W)).astype(np.float32) decayed_pattern += noise # Compute persistence metrics energy_retention = np.mean(decayed_pattern ** 2) / np.mean(initial_pattern ** 2) correlation = np.corrcoef(initial_pattern.flatten(), decayed_pattern.flatten())[0, 1] sample = { "id": f"persistence_l{lambda_val}_t{time_step}_{i:06d}", "key_pattern": initial_pattern.tolist(), "value_pattern": decayed_pattern.tolist(), "pattern_type": "persistence_decay", "persistence_lambda": float(lambda_val), "time_step": int(time_step), "H": H, "W": W, "energy_retention": float(energy_retention), "temporal_correlation": float(correlation if not np.isnan(correlation) else 0.0), "category": "persistence_trace", # Consistent schema fields - set all to None for consistency "fidelity_psnr": None, "orthogonality": None, "compressibility": None, "interference_rms": None, "codebook_type": None, "capacity_load": None, # Additional fields that other samples might have "L": None, "K": None, "reconstruction_error": None, "reconstructed_pattern": None, "codebook_matrix": None } samples.append(sample) return samples def _generate_visual_pattern(self, pattern_type: str, H: int, W: int, is_key: bool = True) -> np.ndarray: """Generate visual patterns for different types.""" if pattern_type == "mnist_digits": # Simple digit-like patterns digit = random.randint(0, 9) pattern = self._create_digit_pattern(digit, H, W) if not is_key: # For value, create slightly transformed version pattern = self._apply_simple_transform(pattern, "rotate_small") elif pattern_type == "geometric_shapes": shape = random.choice(["circle", "square", "triangle", "cross"]) pattern = self._create_geometric_pattern(shape, H, W) if not is_key: pattern = self._apply_simple_transform(pattern, "scale") elif pattern_type == "noise_patterns": pattern = np.random.randn(H, W).astype(np.float32) pattern = (pattern - pattern.mean()) / pattern.std() if not is_key: pattern = pattern + 0.1 * np.random.randn(H, W) else: # Default random pattern pattern = np.random.uniform(-1, 1, (H, W)).astype(np.float32) return pattern def _generate_synthetic_map(self, map_type: str, H: int, W: int, seed: int) -> np.ndarray: """Generate synthetic spatial maps.""" np.random.seed(seed) if map_type == "gaussian_fields": # Random Gaussian field x, y = np.meshgrid(np.linspace(-2, 2, W), np.linspace(-2, 2, H)) pattern = np.exp(-(x**2 + y**2) / (2 * (0.5 + random.random())**2)) elif map_type == "spiral_patterns": # Spiral pattern x, y = np.meshgrid(np.linspace(-np.pi, np.pi, W), np.linspace(-np.pi, np.pi, H)) r = np.sqrt(x**2 + y**2) theta = np.arctan2(y, x) pattern = np.sin(r * 3 + theta * random.randint(1, 5)) elif map_type == "frequency_domains": # Frequency domain pattern freq_x, freq_y = random.randint(1, 8), random.randint(1, 8) x, y = np.meshgrid(np.linspace(0, 2*np.pi, W), np.linspace(0, 2*np.pi, H)) pattern = np.sin(freq_x * x) * np.cos(freq_y * y) else: # Default random field pattern = np.random.randn(H, W) # Normalize pattern = (pattern - pattern.mean()) / (pattern.std() + 1e-7) return pattern.astype(np.float32) def _create_digit_pattern(self, digit: int, H: int, W: int) -> np.ndarray: """Create simple digit-like pattern.""" pattern = np.zeros((H, W), dtype=np.float32) # Simple digit patterns h_center, w_center = H // 2, W // 2 size = min(H, W) // 3 if digit in [0, 6, 8, 9]: # Draw circle/oval y, x = np.ogrid[:H, :W] mask = ((x - w_center) ** 2 / size**2 + (y - h_center) ** 2 / size**2) <= 1 pattern[mask] = 1.0 if digit in [1, 4, 7]: # Draw vertical line pattern[h_center-size:h_center+size, w_center-2:w_center+2] = 1.0 # Add some randomization noise = 0.1 * np.random.randn(H, W) pattern = np.clip(pattern + noise, -1, 1) return pattern def _create_geometric_pattern(self, shape: str, H: int, W: int) -> np.ndarray: """Create geometric shape patterns.""" pattern = np.zeros((H, W), dtype=np.float32) center_h, center_w = H // 2, W // 2 size = min(H, W) // 4 if shape == "circle": y, x = np.ogrid[:H, :W] mask = ((x - center_w) ** 2 + (y - center_h) ** 2) <= size**2 pattern[mask] = 1.0 elif shape == "square": pattern[center_h-size:center_h+size, center_w-size:center_w+size] = 1.0 elif shape == "cross": pattern[center_h-size:center_h+size, center_w-3:center_w+3] = 1.0 pattern[center_h-3:center_h+3, center_w-size:center_w+size] = 1.0 return pattern def _apply_simple_transform(self, pattern: np.ndarray, transform: str) -> np.ndarray: """Apply simple transformations to patterns.""" if transform == "rotate_small": # Small rotation (simplified) return np.roll(pattern, random.randint(-2, 2), axis=random.randint(0, 1)) elif transform == "scale": # Simple scaling via interpolation approximation return pattern * (0.8 + 0.4 * random.random()) else: return pattern def _apply_map_transform(self, key_map: np.ndarray, value_map: np.ndarray, map_type: str) -> np.ndarray: """Apply transformation relationship between key and value maps.""" if map_type == "gaussian_fields": # Value is blurred version of key return 0.7 * key_map + 0.3 * value_map elif map_type == "spiral_patterns": # Value is phase-shifted version return np.roll(key_map, random.randint(-3, 3), axis=1) else: # Default: slightly correlated return 0.8 * key_map + 0.2 * value_map def _compute_psnr(self, pattern1: np.ndarray, pattern2: np.ndarray) -> float: """Compute Peak Signal-to-Noise Ratio.""" mse = np.mean((pattern1 - pattern2) ** 2) if mse == 0: return float('inf') max_val = max(np.max(pattern1), np.max(pattern2)) psnr = 20 * np.log10(max_val / np.sqrt(mse)) return psnr def _compute_orthogonality(self, vec1: np.ndarray, vec2: np.ndarray) -> float: """Compute orthogonality score between two vectors.""" vec1_norm = vec1 / (np.linalg.norm(vec1) + 1e-7) vec2_norm = vec2 / (np.linalg.norm(vec2) + 1e-7) dot_product = np.abs(np.dot(vec1_norm, vec2_norm)) orthogonality = 1.0 - dot_product # 1 = orthogonal, 0 = parallel return orthogonality def _compute_gzip_ratio(self, pattern: np.ndarray) -> float: """Compute compressibility using gzip ratio.""" # Convert to bytes pattern_bytes = (pattern * 255).astype(np.uint8).tobytes() compressed = gzip.compress(pattern_bytes) ratio = len(compressed) / len(pattern_bytes) return ratio def _compute_interference_rms(self, patterns: List[np.ndarray], target: np.ndarray) -> float: """Compute RMS interference from multiple patterns.""" if not patterns: return 0.0 # Sum all patterns except target interference = np.zeros_like(target) for p in patterns[1:]: # Skip first pattern (target) interference += p rms = np.sqrt(np.mean(interference ** 2)) return rms def _compute_pattern_orthogonality(self, patterns: List[np.ndarray]) -> float: """Compute average orthogonality between patterns.""" if len(patterns) < 2: return 1.0 orthogonalities = [] for i in range(len(patterns)): for j in range(i + 1, min(i + 5, len(patterns))): # Limit comparisons orth = self._compute_orthogonality(patterns[i].flatten(), patterns[j].flatten()) orthogonalities.append(orth) return np.mean(orthogonalities) if orthogonalities else 1.0 def _generate_codebook(self, codebook_type: str, L: int, K: int, seed: int) -> np.ndarray: """Generate codebook matrix for different types.""" np.random.seed(seed) if codebook_type == "hadamard" and L <= 64 and K <= 64: # Simple Hadamard-like matrix (for small sizes) codebook = np.random.choice([-1, 1], size=(L, K)) elif codebook_type == "random_orthogonal": # Random orthogonal matrix random_matrix = np.random.randn(L, K) if L >= K: q, _ = np.linalg.qr(random_matrix) codebook = q[:, :K] else: codebook = random_matrix else: # Default random matrix codebook = np.random.randn(L, K) / np.sqrt(L) return codebook.astype(np.float32) def _simulate_membrane_operation(self, codebook: np.ndarray, key: np.ndarray, value: np.ndarray, H: int, W: int) -> Tuple[np.ndarray, np.ndarray]: """Simulate membrane write and read operation.""" L, K = codebook.shape # Simulate write: M += alpha * C[:, k] βŠ— V # For simplicity, use first codebook column alpha = 1.0 membrane = np.zeros((L, H, W)) # Write operation (simplified) for l in range(min(L, 16)): # Limit for memory membrane[l] = codebook[l, 0] * value # Read operation: Y = ReLU(einsum('lhw,lk->khw', M, C)) # Simplified readout read_result = np.zeros((H, W)) for l in range(min(L, 16)): read_result += codebook[l, 0] * membrane[l] # Apply ReLU read_result = np.maximum(0, read_result) return membrane, read_result.astype(np.float32) def _compute_codebook_orthogonality(self, codebook: np.ndarray) -> float: """Compute orthogonality measure of codebook.""" # Compute Gram matrix G = C^T C gram = codebook.T @ codebook # Orthogonality measure: how close to identity matrix identity = np.eye(gram.shape[0]) frobenius_dist = np.linalg.norm(gram - identity, 'fro') # Normalize by matrix size orthogonality = 1.0 / (1.0 + frobenius_dist / gram.shape[0]) return orthogonality def build_complete_dataset(self) -> DatasetDict: """Build the complete WrinkleBrane dataset.""" print("🧠 Building WrinkleBrane Dataset...") all_samples = [] # 1. Visual memory pairs (40% of dataset) print("πŸ‘οΈ Generating visual memory pairs...") visual_samples = self.generate_visual_memory_pairs(8000) all_samples.extend(visual_samples) # 2. Synthetic maps (25% of dataset) print("πŸ—ΊοΈ Generating synthetic maps...") map_samples = self.generate_synthetic_maps(5000) all_samples.extend(map_samples) # 3. Interference studies (20% of dataset) print("⚑ Generating interference studies...") interference_samples = self.generate_interference_studies(4000) all_samples.extend(interference_samples) # 4. Orthogonality benchmarks (10% of dataset) print("πŸ“ Generating orthogonality benchmarks...") orthogonal_samples = self.generate_orthogonality_benchmarks(2000) all_samples.extend(orthogonal_samples) # 5. Persistence traces (5% of dataset) print("⏰ Generating persistence traces...") persistence_samples = self.generate_persistence_traces(1000) all_samples.extend(persistence_samples) # Split into train/validation/test random.shuffle(all_samples) total = len(all_samples) train_split = int(0.8 * total) val_split = int(0.9 * total) train_data = all_samples[:train_split] val_data = all_samples[train_split:val_split] test_data = all_samples[val_split:] # Create HuggingFace datasets dataset_dict = DatasetDict({ 'train': Dataset.from_list(train_data), 'validation': Dataset.from_list(val_data), 'test': Dataset.from_list(test_data) }) print(f"βœ… Dataset built: {len(train_data)} train, {len(val_data)} val, {len(test_data)} test") return dataset_dict def upload_to_huggingface(self, dataset: DatasetDict, private: bool = True) -> str: """Upload dataset to HuggingFace Hub.""" print(f"🌐 Uploading to HuggingFace: {self.repo_id}") try: # Create repository create_repo( repo_id=self.repo_id, repo_type="dataset", private=private, exist_ok=True, token=self.hf_token ) # Add dataset metadata dataset_info = { "dataset_info": self.config, "splits": { "train": len(dataset["train"]), "validation": len(dataset["validation"]), "test": len(dataset["test"]) }, "features": { "id": "string", "key_pattern": "2D array of floats (H x W)", "value_pattern": "2D array of floats (H x W)", "pattern_type": "string", "H": "integer (height)", "W": "integer (width)", "category": "string", "optional_metrics": "various floats for specific sample types" }, "usage_notes": [ "Optimized for WrinkleBrane associative memory training", "Key-value pairs for membrane storage and retrieval", "Includes interference studies and capacity analysis", "Supports orthogonality optimization research" ] } # Push dataset with metadata dataset.push_to_hub( repo_id=self.repo_id, token=self.hf_token, private=private ) # Upload additional metadata with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: json.dump(dataset_info, f, indent=2) self.api.upload_file( path_or_fileobj=f.name, path_in_repo="dataset_info.json", repo_id=self.repo_id, repo_type="dataset", token=self.hf_token ) print(f"βœ… Dataset uploaded successfully to: https://huggingface.co/datasets/{self.repo_id}") return f"https://huggingface.co/datasets/{self.repo_id}" except Exception as e: print(f"❌ Upload failed: {e}") raise def create_wrinklebrane_dataset(hf_token: str, repo_id: str = "WrinkleBrane") -> str: """ Convenience function to create and upload WrinkleBrane dataset. Args: hf_token: HuggingFace access token repo_id: Dataset repository ID Returns: URL to the uploaded dataset """ builder = WrinkleBraneDatasetBuilder(hf_token, repo_id) dataset = builder.build_complete_dataset() return builder.upload_to_huggingface(dataset, private=True)