Source code for atml.irt

"""
The :mod:'atml.irt' module contains a set of statistical models based on the Item Response Theory.
"""
# Author: Hao Song (nuacesh@gmail.com)
# License: BSD-3

import numpy

import scipy.stats

import tensorflow as tf

from adhoc_opt.optimiser import parameter_update

tf.compat.v1.enable_eager_execution()

pi = tf.constant(numpy.pi, dtype='float32')

eps = 1e-6


[docs]class Beta_3_IRT: """ The class for the Beta-3 IRT model. """ def __init__(self): """ """ self.irt_type = 'beta3' self.N_dataset = None self.N_model = None self.logit_theta = None self.logit_delta = None self.log_a = None self.parameter = None
[docs] def fit(self, dataset_list, dataset, model_list, model, measure): """ Fit the IRT model to a collection of testing results. Parameters ---------- dataset_list: list The list contains all the reference names of the datasets. dataset: numpy.ndarray A (n_experiment, ) numpy array contains the dataset index of each experiment model_list: list The list contains all the reference names of the models. model: numpy.ndarray A (n_experiment, ) numpy array contains the model index of each experiment. measure: numpy.ndarray A (n_experiment, ) numpy array contains the performance measurement of each experiment. """ self.N_dataset = len(dataset_list) self.N_model = len(model_list) self.logit_theta = numpy.zeros(self.N_model) self.logit_delta = numpy.zeros(self.N_dataset) self.log_a = numpy.zeros(self.N_dataset) parameter = tf.cast(numpy.hstack([self.logit_theta, self.logit_delta, self.log_a]), 'float32') measure[measure <= eps] = eps measure[measure >= (1.0 - eps)] = 1 - eps data = numpy.hstack([model.reshape(-1, 1), dataset.reshape(-1, 1), measure.reshape(-1, 1)]) extra_args = ('beta3', (self.N_dataset, self.N_model)) parameter = parameter_update(theta_0=tf.Variable(parameter), data=data, extra_args=extra_args, obj=get_obj, obj_g=get_obj_g, lr=1e-3, batch_size=1, val_size=None, factr=0.0, tol=8192, max_batch=int(1e8), plot_loss=False) self.parameter = parameter self.logit_theta = parameter[:self.N_model] self.logit_delta = parameter[self.N_model:self.N_model + self.N_dataset] self.log_a = parameter[self.N_model + self.N_dataset:self.N_model + 2 * self.N_dataset]
[docs] def predict(self, dataset, model): """ Predict the expected response for a set of combinations between different models and datasets Parameters ---------- dataset: numpy.ndarray A (n_test, ) numpy array contains the dataset index of each testing experiment. model: numpy.ndarray A (n_test, ) numpy array contains the model index of each testing experiment. Returns ---------- E: numpy.ndarray A (n_test, ) numpy array contains the expected performance measurement of each testing experiment. """ idx = numpy.arange(0, len(dataset), 8) E = [] for i in range(0, len(idx)): E.append(get_E_beta_3(self.parameter, model[idx[i]:idx[i]+8], dataset[idx[i]:idx[i]+8], self.N_dataset, self.N_model)) E = numpy.hstack(E) return E
[docs] def curve(self, d_id): """ Generate the item characteristic curve (expectation, 0.75, 0.5, 0.25 percentile) for a given dataset Parameters ---------- d_id: int The index of the given dataset. Returns ---------- E: numpy.ndarray The curve values of the expectation. E_up: numpy.ndarray The curve values of the 0.75 percentile. E_mid: numpy.ndarray The curve values of the 0.5 percentile (median). E_low: numpy.ndarray The curve values of the 0.25 percentile. """ E, E_up, E_mid, E_low = get_curve_beta3(self.parameter, d_id, self.N_dataset, self.N_model) return E, E_up, E_mid, E_low
[docs]class Logistic_IRT: """ The class for the Logistic IRT model. """ def __init__(self): """ """ self.irt_type = 'logistic' self.N_dataset = None self.N_model = None self.parameter = None self.theta = None self.delta = None self.log_a = None self.log_s2 = None
[docs] def fit(self, dataset_list, dataset, model_list, model, measure): """ Fit the IRT model to a collection of testing results. Parameters ---------- dataset_list: list The list contains all the reference names of the datasets. dataset: numpy.ndarray A (n_experiment, ) numpy array contains the dataset index of each experiment model_list: list The list contains all the reference names of the models. model: numpy.ndarray A (n_experiment, ) numpy array contains the model index of each experiment. measure: atml.Measure A (n_experiment, ) numpy array contains the performance measurement of each experiment. """ self.N_dataset = len(dataset_list) self.N_model = len(model_list) self.theta = numpy.random.randn(self.N_model) self.delta = numpy.random.randn(self.N_dataset) self.log_a = numpy.random.randn(self.N_dataset) self.log_s2 = numpy.ones(self.N_dataset) * numpy.log(1e2) parameter = tf.cast(numpy.hstack([self.theta, self.delta, self.log_a, self.log_s2]), 'float32') measure[measure <= eps] = eps measure[measure >= (1.0 - eps)] = 1 - eps data = numpy.hstack([model.reshape(-1, 1), dataset.reshape(-1, 1), measure.reshape(-1, 1)]) extra_args = ('logistic', (self.N_dataset, self.N_model)) parameter = parameter_update(theta_0=tf.Variable(parameter), data=data, extra_args=extra_args, obj=get_obj, obj_g=get_obj_g, lr=1e-3, batch_size=1, val_size=None, factr=0.0, tol=8192, max_batch=int(1e8), plot_loss=False) self.parameter = parameter self.theta = parameter[:self.N_model] self.delta = parameter[self.N_model:self.N_model + self.N_dataset] self.log_a = parameter[self.N_model + self.N_dataset:self.N_model + 2 * self.N_dataset] self.log_s2 = parameter[self.N_model + 2 * self.N_dataset:self.N_model + 3 * self.N_dataset]
[docs] def predict(self, dataset, model): """ Predict the expected response for a set of combinations between different models and datasets Parameters ---------- dataset: numpy.ndarray A (n_test, ) numpy array contains the dataset index of each testing experiment. model: numpy.ndarray A (n_test, ) numpy array contains the model index of each testing experiment. Returns ---------- E: numpy.ndarray A (n_test, ) numpy array contains the expected performance measurement of each testing experiment. """ idx = numpy.arange(0, len(dataset), 8) E = [] for i in range(0, len(idx)): E.append(get_E_logistic(self.parameter, model[idx[i]:idx[i]+8], dataset[idx[i]:idx[i]+8], self.N_dataset, self.N_model)) return numpy.hstack(E)
[docs] def curve(self, d_id): """ Generate the item characteristic curve (expectation, 0.75, 0.5, 0.25 percentile) for a given dataset Parameters ---------- d_id: int The index of the given dataset. Returns ---------- E: numpy.ndarray The curve values of the expectation. E_up: numpy.ndarray The curve values of the 0.75 percentile. E_mid: numpy.ndarray The curve values of the 0.5 percentile (median). E_low: numpy.ndarray The curve values of the 0.25 percentile. """ E, E_up, E_mid, E_low = get_curve_logistic(self.parameter, d_id, self.N_dataset, self.N_model) return E, E_up, E_mid, E_up
[docs]def get_curve_logistic(parameter, did, N_data, N_flow): """ Generate the item characteristic curve (expectation, 0.75, 0.5, 0.25 percentile) for a given dataset Parameters ---------- parameter: numpy.ndarray The estimated parameters for the Logistic IRT model. did: int The index of the given dataset. N_data: int The total number of datasets in the IRT model. N_flow: int The total number of models in the IRT model. Returns ---------- E: numpy.ndarray The curve values of the expectation. E_up: numpy.ndarray The curve values of the 0.75 percentile. E_mid: numpy.ndarray The curve values of the 0.5 percentile (median). E_low: numpy.ndarray The curve values of the 0.25 percentile. """ theta_edge = numpy.max(numpy.abs(parameter[:N_flow])) theta = tf.cast(numpy.linspace(-theta_edge, theta_edge, 128), dtype='float32') did = (numpy.ones(128) * did).astype('int') delta = tf.gather(parameter[N_flow:N_flow + N_data], did, axis=0) a = tf.math.exp(tf.gather(parameter[N_flow + N_data:N_flow + 2 * N_data], did, axis=0)) log_s2 = tf.gather(parameter[N_flow + 2 * N_data:N_flow + 3 * N_data], did, axis=0) s2 = tf.math.exp(log_s2) s = tf.sqrt(s2) mu = - a * (theta.numpy().ravel() - delta.numpy().ravel()) E = 1 / (1 + numpy.exp((mu / numpy.sqrt(1 + numpy.pi * numpy.square(s.numpy()) / 8)))) E_up = 1 / (1 + numpy.exp(scipy.stats.norm.ppf(q=0.75, loc=mu, scale=s.numpy()))) E_mid = 1 / (1 + numpy.exp(scipy.stats.norm.ppf(q=0.5, loc=mu, scale=s.numpy()))) E_low = 1 / (1 + numpy.exp(scipy.stats.norm.ppf(q=0.25, loc=mu, scale=s.numpy()))) return E, E_up, E_mid, E_low
[docs]def ml_logistic_obj(parameter, fid, did, measure, N_data, N_flow): """ The log-likelihood objective function of the Logistic IRT model Parameters ---------- parameter: tensorflow.Variable The parameters for the Logistic IRT model. fid: tf.Tensor A (n_batch, ) tensorflow array contains the dataset index of each experiment. did: tf.Tensor A (n_batch, ) tensorflow array contains the model index of each experiment. measure: tf.Tensor A (n_batch, ) tensorflow array contains the performance measurement of each experiment. N_data: int The total number of datasets in the IRT model. N_flow: int The total number of models in the IRT model. Returns ---------- L: tf.Tensor The averaged log-likelihood of the IRT model. """ measure = tf.reshape(measure, -1) logit_measure = tf.math.log((1 - measure) / measure) theta = tf.gather(parameter[:N_flow], fid, axis=0) delta = tf.gather(parameter[N_flow:N_flow + N_data], did, axis=0) a = tf.math.exp(tf.gather(parameter[N_flow + N_data:N_flow + 2 * N_data], did, axis=0)) log_s2 = tf.gather(parameter[N_flow + 2 * N_data:N_flow + 3 * N_data], did, axis=0) s2 = tf.math.exp(log_s2) + tf.constant(eps, dtype='float32') mu = - a * (theta - delta) diff = logit_measure - mu exp = tf.math.exp(- 0.5 * tf.math.square(diff) / s2) sample_lik = 1 / (tf.math.sqrt(2 * pi * s2)) * exp * (1 / (measure * (1 - measure))) loglik = tf.math.log(sample_lik + tf.constant(eps, dtype='float32')) # loglik = 0.5 * tf.math.log(1 / (2 * pi)) + 0.5 * tf.math.log(1 / s2) \ # - 0.5 * tf.math.square(logit_measure - mu) / s2 \ # + tf.math.log(1 / (measure * (1 - measure))) L = - tf.reduce_mean(loglik) return L
[docs]def get_E_logistic(parameter, fid, did, N_data, N_flow): """ Predict the expected response for a set of combinations between different models and datasets Parameters ---------- parameter: numpy.ndarray The estimated parameters for the Logistic IRT model. fid: numpy.ndarray A (n_test, ) numpy array contains the model index of each testing experiment. did: numpy.ndarray A (n_test, ) numpy array contains the dataset index of each testing experiment. N_data: int The total number of datasets in the IRT model. N_flow: int The total number of models in the IRT model. Returns ---------- E: numpy.ndarray A (n_test, ) numpy array contains the expected performance measurement of each testing experiment. """ theta = tf.gather(parameter[:N_flow], fid, axis=0) delta = tf.gather(parameter[N_flow:N_flow + N_data], did, axis=0) a = tf.math.exp(tf.gather(parameter[N_flow + N_data:N_flow + 2 * N_data], did, axis=0)) log_s2 = tf.gather(parameter[N_flow + 2 * N_data:N_flow + 3 * N_data], did, axis=0) s2 = tf.math.exp(log_s2) + tf.constant(eps, dtype='float32') s = tf.math.sqrt(s2) mu = - a * (theta - delta) samples = scipy.stats.norm.rvs(loc=mu, scale=s, size=[1024, len(mu)]) E = numpy.mean(1 / (1 + numpy.exp(samples)), axis=0) return E
[docs]def get_curve_beta3(parameter, did, N_data, N_flow): """ Generate the item characteristic curve (expectation, 0.75, 0.5, 0.25 percentile) for a given dataset Parameters ---------- parameter: numpy.ndarray The estimated parameters for the Beta-3 IRT model. did: int The index of the given dataset. N_data: int The total number of datasets in the IRT model. N_flow: int The total number of models in the IRT model. Returns ---------- E: numpy.ndarray The curve values of the expectation. E_up: numpy.ndarray The curve values of the 0.75 percentile. E_mid: numpy.ndarray The curve values of the 0.5 percentile (median). E_low: numpy.ndarray The curve values of the 0.25 percentile. """ theta = tf.cast(numpy.linspace(1e-6, 1.0-1e-6, 128), dtype='float32') did = (numpy.ones(128) * did).astype('int') logit_delta = tf.cast(tf.gather(parameter[N_flow:N_flow + N_data], did, axis=0), 'float32') a = tf.cast(tf.math.exp(tf.gather(parameter[N_flow + N_data:N_flow + 2 * N_data], did, axis=0)), 'float32') delta = tf.clip_by_value(1 / (1 + tf.math.exp(logit_delta)), tf.constant(eps, dtype='float32'), tf.constant(1 - eps, dtype='float32')) alpha = tf.math.pow(theta / delta, a) + tf.constant(eps, dtype='float32') beta = tf.math.pow((1 - theta) / (1 - delta), a) + tf.constant(eps, dtype='float32') E = alpha.numpy() / (alpha.numpy() + beta.numpy()) E_up = scipy.stats.beta.ppf(q=0.75, a=alpha.numpy(), b=beta.numpy()) E_mid = scipy.stats.beta.ppf(q=0.5, a=alpha.numpy(), b=beta.numpy()) E_low = scipy.stats.beta.ppf(q=0.25, a=alpha.numpy(), b=beta.numpy()) return E, E_up, E_mid, E_low
[docs]def ml_beta_3_obj(parameter, fid, did, measure, N_data, N_flow): """ The log-likelihood objective function of the Beta-3 IRT model Parameters ---------- parameter: tensorflow.Variable The parameters for the Beta-3 IRT model. fid: tf.Tensor A (n_batch, ) tensorflow array contains the dataset index of each experiment. did: tf.Tensor A (n_batch, ) tensorflow array contains the model index of each experiment. measure: tf.Tensor A (n_batch, ) tensorflow array contains the performance measurement of each experiment. N_data: int The total number of datasets in the IRT model. N_flow: int The total number of models in the IRT model. Returns ---------- L: tf.Tensor The averaged log-likelihood of the IRT model. """ logit_theta = tf.gather(parameter[:N_flow], fid, axis=0) logit_delta = tf.gather(parameter[N_flow:N_flow + N_data], did, axis=0) a = tf.math.exp(tf.gather(parameter[N_flow + N_data:N_flow + 2 * N_data], did, axis=0)) theta = tf.clip_by_value(1 / (1 + tf.math.exp(logit_theta)), tf.constant(eps, dtype='float32'), tf.constant(1 - eps, dtype='float32')) delta = tf.clip_by_value(1 / (1 + tf.math.exp(logit_delta)), tf.constant(eps, dtype='float32'), tf.constant(1 - eps, dtype='float32')) alpha = tf.math.pow(theta / delta, a) + tf.constant(eps, dtype='float32') beta = tf.math.pow((1 - theta) / (1 - delta), a) + tf.constant(eps, dtype='float32') measure = tf.reshape(measure, -1) loglik = (alpha - 1) * tf.math.log(measure) + (beta - 1) * tf.math.log(1 - measure) - \ (tf.math.lgamma(alpha) + tf.math.lgamma(beta) - tf.math.lgamma(alpha + beta)) L = - tf.reduce_mean(loglik) return L
[docs]def get_E_beta_3(parameter, fid, did, N_data, N_flow): """ Predict the expected response for a set of combinations between different models and datasets Parameters ---------- parameter: numpy.ndarray The estimated parameters for the Beta-3 IRT model. fid: numpy.ndarray A (n_test, ) numpy array contains the model index of each testing experiment. did: numpy.ndarray A (n_test, ) numpy array contains the dataset index of each testing experiment. N_data: int The total number of datasets in the IRT model. N_flow: int The total number of models in the IRT model. Returns ---------- E: numpy.ndarray A (n_test, ) numpy array contains the expected performance measurement of each testing experiment. """ logit_theta = tf.gather(parameter[:N_flow], fid, axis=0) logit_delta = tf.gather(parameter[N_flow:N_flow + N_data], did, axis=0) a = tf.math.exp(tf.gather(parameter[N_flow + N_data:N_flow + 2 * N_data], did, axis=0)) theta = tf.clip_by_value(1 / (1 + tf.math.exp(logit_theta)), tf.constant(eps, dtype='float32'), tf.constant(1 - eps, dtype='float32')) delta = tf.clip_by_value(1 / (1 + tf.math.exp(logit_delta)), tf.constant(eps, dtype='float32'), tf.constant(1 - eps, dtype='float32')) alpha = tf.math.pow(theta / delta, a) + tf.constant(eps, dtype='float32') beta = tf.math.pow((1 - theta) / (1 - delta), a) + tf.constant(eps, dtype='float32') E = alpha / (alpha + beta) return E
[docs]def get_obj(parameter, data, extra_args): """ Wrapper function to get the value of the objective function. Parameters ---------- parameter: tensorflow.Variable The parameter of the IRT model. data: tensorflow.Tensor A (n_batch, 3) tensor contains the model index, dataset index, and performance measure of each experiment. extra_args: tuple A tuple contains extra information for the IRT model. (irt type, total number of datasets, total number of models) Returns ---------- L: tensorflow.Tensor The averaged log-likelihood of the IRT model. """ fid = tf.cast(data[:, 0], 'int32') did = tf.cast(data[:, 1], 'int32') measure = tf.cast(data[:, 2:], 'float32') irt_type, tmp_args = extra_args if irt_type == 'beta3': N_data, N_flow = tmp_args L = ml_beta_3_obj(parameter, fid, did, measure, N_data, N_flow) elif irt_type == 'logistic': N_data, N_flow = tmp_args L = ml_logistic_obj(parameter, fid, did, measure, N_data, N_flow) else: L = None return L
[docs]def get_obj_g(parameter, data, extra_args): """ Wrapper function to get the gradient of the objective function. Parameters ---------- parameter: tensorflow.Variable The parameter of the IRT model. data: tensorflow.Tensor A (n_batch, 3) tensor contains the model index, dataset index, and performance measure of each experiment. extra_args: tuple A tuple contains extra information for the IRT model. (irt type, total number of datasets, total number of models) Returns ---------- L: tensorflow.Tensor g: tensorflow.Tensor """ with tf.GradientTape() as gt: gt.watch(parameter) L = get_obj(parameter, data, extra_args) g = gt.gradient(L, parameter) return L, g