TCube_Merging / BetaMixture.py
razaimam45's picture
Upload 108 files
a96891a verified
import numpy as np
from scipy.special import betaln, logsumexp
from sklearn.cluster import KMeans
class BetaMixtureModel:
"""
Beta Mixture Model (Multivariate version).
Each dimension is modeled independently by a Beta distribution.
"""
def __init__(self, n_mixtures=3, random_seed=1):
self.n_mixtures = n_mixtures
self.random_seed = random_seed
self.convergence = False
def _init_clusters(self, data_matrix, init_round):
"""
Initialize the mixture responsibilities (assignments) via k-means or uniformly random
"""
if self.method == "kmeans":
km = KMeans(
n_clusters=self.n_mixtures,
n_init=1,
random_state=self.random_seed + init_round
).fit(data_matrix)
resp_matrix = np.zeros((self.n_observations, self.n_mixtures))
resp_matrix[np.arange(self.n_observations), km.labels_] = 1
else:
np.random.seed(self.random_seed + init_round)
resp_matrix = np.random.rand(self.n_observations, self.n_mixtures)
resp_matrix /= resp_matrix.sum(axis=1, keepdims=True)
# Numerical stability
resp_matrix += 10 * np.finfo(resp_matrix.dtype).eps
# Initialize beta parameters (alpha/beta for each dimension)
self.beta_params_ = np.zeros((self.n_mixtures, self.n_components * 2))
self._M_step(data_matrix, np.log(resp_matrix))
def _calc_log_weights(self):
"""
Return log of current mixture weights.
"""
return np.log(self.mix_weights_)
def _calc_mixture_log_probs(self, data_matrix, mixture_idx):
"""
Compute log-prob for a single mixture (used if parallelized).
"""
alpha_vec = self.beta_params_[mixture_idx, :self.n_components]
beta_vec = self.beta_params_[mixture_idx, self.n_components:]
beta_func_log = betaln(alpha_vec, beta_vec)
return (
(alpha_vec - 1) * np.log(data_matrix)
+ (beta_vec - 1) * np.log(1 - data_matrix)
- beta_func_log
).sum(axis=1)
def _calc_log_probs_all_mixtures(self, data_matrix):
"""
Return log-prob for each observation under each mixture (unnormalized).
"""
log_prob = np.empty((self.n_observations, self.n_mixtures))
for mix in range(self.n_mixtures):
alpha_vec = self.beta_params_[mix, :self.n_components]
beta_vec = self.beta_params_[mix, self.n_components:]
bfn = betaln(alpha_vec, beta_vec)
log_prob[:, mix] = (
(alpha_vec - 1) * np.log(data_matrix)
+ (beta_vec - 1) * np.log(1 - data_matrix)
- bfn
).sum(axis=1)
return log_prob
def _calc_weighted_log_probs(self, data_matrix):
"""
Return the sum of log-probabilities and log-weights.
"""
return self._calc_log_probs_all_mixtures(data_matrix) + self._calc_log_weights()
def _calc_log_resp_and_norm(self, data_matrix):
"""
Return (log_prob_norm, log_resp) for the E-step.
"""
weighted_lp = self._calc_weighted_log_probs(data_matrix)
lp_norm = logsumexp(weighted_lp, axis=1)
with np.errstate(under="ignore"):
log_resp = weighted_lp - lp_norm[:, None]
return lp_norm, log_resp
def _E_step(self, data_matrix):
"""
E-step: compute average log_prob_norm and log_resp.
"""
lp_norm, log_resp = self._calc_log_resp_and_norm(data_matrix)
return np.mean(lp_norm), log_resp
def _compute_responsibilities(self, log_resp):
"""
Exponentiate log_resp and sum across observations.
"""
resp_matrix = np.exp(log_resp)
cluster_counts = resp_matrix.sum(axis=0) + 10 * np.finfo(resp_matrix.dtype).eps
return resp_matrix, cluster_counts
def _update_mixture_weights(self, cluster_counts):
"""
Update mixture weights from mixture counts.
"""
self.mix_weights_ = cluster_counts / cluster_counts.sum()
def _M_step(self, data_matrix, log_resp):
"""
M-step: update weights and Beta distribution parameters via moment matching.
"""
resp_matrix, cluster_counts = self._compute_responsibilities(log_resp)
self._update_mixture_weights(cluster_counts)
w_sums = resp_matrix.T @ data_matrix
w_sums_sq = resp_matrix.T @ (data_matrix ** 2)
for m_idx in range(self.n_mixtures):
sum_vals = w_sums[m_idx]
sum_sq_vals = w_sums_sq[m_idx]
mean_val = sum_vals / cluster_counts[m_idx]
var_val = sum_sq_vals / cluster_counts[m_idx] - mean_val ** 2
# Clip variance
variance_cap = mean_val * (1 - mean_val) / 4
var_val = np.minimum(var_val, variance_cap)
var_val += 10 * np.finfo(var_val.dtype).eps
# Compute factor
scaling_factor = (mean_val * (1 - mean_val)) / (var_val + 1e-10) - 1
self.beta_params_[m_idx, :self.n_components] = scaling_factor * mean_val
self.beta_params_[m_idx, self.n_components:] = scaling_factor * (1 - mean_val)
def fit(self, data_matrix, num_init=3, method="kmeans", max_iter=1000, tol=1e-4):
"""
Fit BetaMixtureModel to the data using EM, possibly with multiple initializations.
"""
self.n_observations, self.n_components = data_matrix.shape
self.convergence = False
self.method = method
best_lower_bound = -np.inf
optimal_params = None
for init_round in range(num_init):
# print(f"{init_round + 1}-th BMM initialization")
self._init_clusters(data_matrix, init_round)
ll_bound = -np.inf
for _ in range(max_iter):
prev_bound = ll_bound
lp_norm, log_resp = self._E_step(data_matrix)
self._M_step(data_matrix, log_resp)
ll_bound = lp_norm
delta_bound = ll_bound - prev_bound
if abs(delta_bound) < tol:
self.convergence = True
break
if ll_bound > best_lower_bound:
best_lower_bound = ll_bound
# Update final weights
_, cluster_counts = self._compute_responsibilities(log_resp)
self._update_mixture_weights(cluster_counts)
optimal_params = (self.mix_weights_.copy(), self.beta_params_.copy())
self.mix_weights_, self.beta_params_ = optimal_params
self.max_lower_bound = best_lower_bound
return self
def predict_proba(self, data_matrix):
"""
Return the per-mixture membership probabilities for each sample.
"""
_, log_resp = self._calc_log_resp_and_norm(data_matrix)
return np.exp(log_resp)
def predict(self, data_matrix):
"""
Return the most probable mixture index for each sample.
"""
return np.argmax(self.predict_proba(data_matrix), axis=1)