Source code for fast.sampling.rankings

# Author: Maxwell I. Zimmerman <mizimmer@wustl.edu>
# Contributors:
# Copywright (C) 2017, Washington University in St. Louis
# All rights reserved.
# Unauthorized copying of this file, via any medium, is strictly prohibited
# Proprietary and confidential


#######################################################################
# imports
#######################################################################


import enspara.tpt
#import msmbuilder.tpt
import numpy as np
import time
import scipy.sparse as spar
from . import scalings
from ..base import base
from ..exception import ConvergenceWarning, ImproperlyConfigured


########################################################################
# helper functions
########################################################################


def _evens_select_states(unique_states, n_clones):
    """Helper function for evens state selection. Picks among all
    discovered states evenly. If more states were discovered than
    clones, randomly picks remainder states."""
    # calculate the number of clones per state and the balance to
    # match n_clones
    clones_per_state = int(n_clones / len(unique_states))
    remainder_states = n_clones % len(unique_states)
    # generate states to simulate list
    repeat_states_to_simulate = np.repeat(unique_states, clones_per_state)
    remainder_states_to_simulate = np.random.choice(
        unique_states, remainder_states, replace=False)
    total_states_to_simulate = np.concatenate(
        [repeat_states_to_simulate, remainder_states_to_simulate])
    return total_states_to_simulate


def _unbias_state_selection(states, rankings, n_selections, select_max=True):
    """Unbiases state selection due to state labeling. Shuffles states
    with equivalent rankings so that state index does not influence
    selection probability."""
    # determine the unique ranking values
    unique_rankings = np.unique(rankings)
    # sort states and rankings
    iis_sort = np.argsort(rankings)
    sorted_rankings = rankings[iis_sort]
    sorted_states = states[iis_sort]
    # if we're maximizing the rankings, reverse the sorts
    if select_max:
        unique_rankings = unique_rankings[::-1]
        sorted_rankings = sorted_rankings[::-1]
        sorted_states = sorted_states[::-1]
    # shuffle states with unique rankings to unbias selections
    # this is done upto n_selections
    tot_state_count = 0
    for ranking_num in unique_rankings:
        iis = np.where(sorted_rankings == ranking_num)[0]
        sorted_states[iis] = sorted_states[np.random.permutation(iis)]
        tot_state_count += len(iis)
        if tot_state_count > n_selections:
            break
    return sorted_states[:n_selections]


def _select_states_spreading(
        rankings, unique_states, n_clones, centers, distance_metric,
        select_max=True, width=1.0, non_overlap=True):
    """Selects states that maximize ranking and are structurally unique.
    This proceeds as follows: 1) select a state that maximizes the
    ranking 2) penalize states that are structurally similar to
    previously selected, 3) select a new state that maximizes the
    updated ranking, and 4) repeat 2-3 until n_clones are chosen.

    Parameters
    ----------
    rankings : array, shape=(n_states,)
        The list of state rankings.
    unique_states : array, shape=(n_states,)
        The list of state indices
    n_clones : int,
        Number of states to select.
    centers : array-like,
        A list of state centers.
    distance_metric : function,
        Function for calculating distances between state centers.
    select_max : bool, default = True
        Optionally select from maximum.
    width : float, default = 1.0
        Gaussian width for calculating penalties.
    non_overlap : bool, default = True
        Optionally ensure that states are not sampled with replacement.
    
    Returns
    ----------
    states_to_simulate : array, shape=(n_clones,)
        The list of state indices to restart simulations from.
    """
    # pick the first state
    states_to_simulate = [
        _unbias_state_selection(
            unique_states, rankings, 1, select_max=select_max)[0]]
    # initialize distance list
    dist_list = []
    # iterate state selection
    for num in range(n_clones-1):
        # get distances to previously selected states
        dist_list.append(
            distance_metric(
                centers[unique_states], centers[states_to_simulate[-1]]))
        # convert distances to gaussian penalties
        gaussian_weights = np.sum(
            [
                (1 - np.exp(-(dists**2)/float(2.0*(width**2))))
                for dists in dist_list], axis=0) / len(dist_list)
        # generate new rankings
        new_rankings = rankings + gaussian_weights
        # if only selecting new states, zero the rankings of those
        # previously selected
        if non_overlap:
            states_to_zero = np.array(
                [
                    np.where(unique_states == state)[0]
                    for state in states_to_simulate])
            if select_max:
                new_rankings[states_to_zero] = 0
            else:
                new_rankings[states_to_zero] = np.inf
        # pick next state
        states_to_simulate.append(
            _unbias_state_selection(
                unique_states, new_rankings, 1, select_max=select_max)[0])
    # numpy it and return
    states_to_simulate = np.array(states_to_simulate)
    return states_to_simulate


[docs]def get_unique_states(msm): """returns a list of the visited states within an msm object""" tcounts = msm.tcounts_ unique_states = np.unique(np.nonzero(tcounts)[0]) return unique_states
######################################################################## # page rankings # ########################################################################
[docs]def generate_aij(tcounts, spreading=False): """Generates the adjacency matrix used for page ranking. Parameters ---------- tcounts : matrix, shape=(n_states, n_states) The count matrix of an MSM. Can be dense or sparse. spreading : bool, default=False Optionally transposes matrix to do counts spreading instead of page rank. Returns ---------- aij : matrix, shape=(n_states, n_states) The adjacency matrix used for page ranking. """ # check if sparse matrix if not spar.isspmatrix(tcounts): iis = np.where(tcounts != 0) tcounts = spar.coo_matrix( (tcounts[iis], iis), shape=(len(tcounts), len(tcounts))) else: tcounts = tcounts.tocoo() # get row and col information row_info = tcounts.row col_info = tcounts.col tcounts.data = np.zeros(len(tcounts.data)) + 1 tcounts.setdiag(0) # Sets diagonal to zero tcounts = tcounts.tocsr() tcounts.eliminate_zeros() # optionally does spreading if spreading: tcounts = (tcounts + tcounts.T) / 2. aij = normalize(tcounts, norm='l1', axis=1) else: # Set 0 connections to 1 ( # this doesn't change anything and avoids dividing by zero) connections = tcounts.sum(axis = 1) iis = np.where(connections == 0) connections[iis] = 1 # Convert 1/Connections to sparse matrix format connections = spar.coo_matrix(connections).data con_len = len(connections) iis = (np.array(range(con_len)), np.array(range(con_len))) inv_connections = spar.coo_matrix( (connections**-1, iis), shape=(con_len, con_len)) aij = tcounts.transpose() * inv_connections return aij
[docs]def rank_aij(aij, d=0.85, Pi=None, max_iters=100000, norm=True): """Ranks the adjacency matrix. Parameters ---------- aij : matrix The adjacency matrix used for ranking. d : float The weight of page ranks [0, 1]. A value of 1 is pure page rank and 0 is all the initial ranks. Pi : array, default=None The prior ranks. max_iters : int, default=100000 The maximum number of iterations to check for convergence. norm : bool, default=True Normilizes output ranks Returns ---------- The rankings of each state """ N = float(aij.shape[0]) # if Pi is None, set it to 1/total states if Pi is None: Pi = np.zeros(int(N)) Pi[:] = 1/N # set error for page ranks error = 1 / N**2 # first pass of rankings new_page_rank = (1 - d) * Pi + d * aij.dot(Pi) pr_error = np.sum(np.abs(Pi - new_page_rank)) page_rank = new_page_rank # iterate until error is below threshold iters = 0 while pr_error > error: new_page_rank = (1 - d) * Pi + d * aij.dot(page_rank) pr_error = np.sum(np.abs(page_rank - new_page_rank)) page_rank = new_page_rank iters += 1 # error out if does not converge if iters > max_iters: raise ConvergenceWarning( 'page ranking failed to converge in %s steps' % max_iters) # normalize rankings if norm: page_rank *= 100./page_rank.sum() return page_rank
######################################################################## # ranking classes # ########################################################################
[docs]class evens(base): """Evens ranking object""" def __init__(self): pass @property def class_name(self): return "evens" @property def config(self): return { }
[docs] def rank(self, msm, unique_states=None): return None
[docs] def select_states(self, msm, n_clones): unique_states = get_unique_states(msm) return _evens_select_states(unique_states, n_clones)
[docs]class base_ranking(base): """base ranking class. Pieces out selection of states from independent rankings""" def __init__( self, maximize_ranking=True, state_centers=None, distance_metric=None, width=1.0): self.maximize_ranking = maximize_ranking self.state_centers = state_centers self.distance_metric = distance_metric self.width = width
[docs] def select_states(self, msm, n_clones): # determine discovered states from msm unique_states = get_unique_states(msm) # if not enough discovered states for selection of n_clones, # selects states using the evens method if len(unique_states) < n_clones: states_to_simulate = _evens_select_states(unique_states, n_clones) # selects the n_clones with minimum counts else: rankings = self.rank(msm, unique_states=unique_states) # if a state-ranking is `nan`, does not consider it in rankings. non_nan_rank_iis = np.where(1 - np.isnan(rankings))[0] # if not enought non-`nan` states are discivered, performs evens if len(non_nan_rank_iis) < n_clones: states_to_simulate = _evens_select_states( unique_states[non_nan_rank_iis], n_clones) else: if (self.state_centers is None) or (self.distance_metric is None): states_to_simulate = _unbias_state_selection( unique_states[non_nan_rank_iis], rankings[non_nan_rank_iis], n_clones, select_max=self.maximize_ranking) else: states_to_simulate = _select_states_spreading( rankings[non_nan_rank_iis], unique_states[non_nan_rank_iis], n_clones, centers=self.state_centers, distance_metric=self.distance_metric, select_max=self.maximize_ranking, width=self.width) return states_to_simulate
[docs]class page_ranking(base_ranking): """page ranking. ri = (1-d)*init_ranks + d*aij""" def __init__( self, d, init_pops=True, max_iters=100000, norm=True, spreading=False, maximize_ranking=True, **kwargs): """ Parameters ---------- d : float The weight of page ranks [0, 1]. A value of 1 is pure page rank and 0 is all the initial ranks. init_pops : bool, default=True Optionally uses the populations within an MSM as the initial ranks. max_iters : int, default=100000 The maximum number of iterations to check for convergence. norm : bool, default=True Normilizes output ranks spreading : bool, default = False Solves for page ranks with the transpose of aij. """ self.d = d self.init_pops = init_pops self.max_iters = max_iters self.norm = norm self.spreading = spreading base_ranking.__init__( self, maximize_ranking=maximize_ranking, **kwargs) @property def class_name(self): return "page_ranking" @property def config(self): return { 'd': self.d, 'init_pops': self.init_pops, 'max_iters': self.max_iters, 'norm': self.norm, 'spreading': self.spreading, 'maximize_ranking': self.maximize_ranking, }
[docs] def rank(self, msm, unique_states=None): # generate aij matrix if unique_states is None: unique_states = get_unique_states(msm) tcounts = msm.tcounts_ tcounts_sub = tcounts.tocsr()[:, unique_states][unique_states, :] aij = generate_aij(tcounts_sub) # determine the initial ranks if self.init_pops: Pi = msm.eq_probs_[unique_states] else: Pi = None rankings = rank_aij( aij, d=self.d, Pi=Pi, max_iters=self.max_iters, norm=self.norm) return rankings
[docs]class counts(base_ranking): """Min-counts ranking object. Ranks states based on their raw counts.""" def __init__(self, maximize_ranking=False, scaling=None, **kwargs): self.scaling = scaling base_ranking.__init__( self, maximize_ranking=maximize_ranking, **kwargs) @property def class_name(self): return "counts" @property def config(self): return { 'maximize_ranking': self.maximize_ranking, }
[docs] def rank(self, msm, unique_states=None): counts_per_state = np.array(msm.tcounts_.sum(axis=1)).flatten() if unique_states is None: unique_states = np.where(counts_per_state > 0)[0] counts_return = counts_per_state[unique_states] if self.scaling is not None: counts_return = self.scaling.scale(counts_return) return counts_return
[docs]class FAST(base_ranking): """FAST ranking object""" def __init__( self, state_rankings=None, directed_scaling = scalings.feature_scale(maximize=True), statistical_component = counts(), statistical_scaling = scalings.feature_scale(maximize=False), alpha = 1, alpha_percent=False, maximize_ranking=True, **kwargs): """ Parameters ---------- state_rankings : array, shape = (n_states, ) An array with ranking values for each state. This is previously calculated for each state in the MSM. directed_scaling : scaling object, default = feature_scale(maximize=True) An object used for scaling the directed component values. statistical_component : ranking function A function that has an enspara msm object as input, and returns the unique states and statistical rankings per state. statistical_scaling : scaling object, default = feature_scale(maximize=False) An object used for scaling the statistical component values. alpha : float, default = 1 The weighting of the statistical component to FAST. i.e. r_i = directed + alpha * undirected alpha_percent : bool, default=False Optionally treat the alpha value as a percent. i.e. r_i = (1 - alpha) * directed + alpha * undirected """ self.state_rankings = state_rankings self.directed_scaling = directed_scaling self.statistical_component = statistical_component self.statistical_scaling = statistical_scaling self.alpha = alpha self.alpha_percent = alpha_percent if self.alpha_percent and ((self.alpha < 0) or (self.alpha > 1)): raise ImproperlyConfigured( 'alpha_percent is selected, although alpha is not between 0 and 1!') base_ranking.__init__( self, maximize_ranking=maximize_ranking, **kwargs) @property def class_name(self): return "FAST" @property def config(self): return { 'state_ranking': self.state_rankings, 'directed_scaling': self.directed_scaling, 'statistical_component': self.statistical_component, 'statistical_scaling': self.statistical_scaling, 'alpha': self.alpha, 'alpha_percent': self.alpha_percent, 'maximize_ranking': self.maximize_ranking }
[docs] def rank(self, msm, unique_states=None): # determine unique states if unique_states is None: unique_states = get_unique_states(msm) if self.statistical_component is None: statistical_weights = np.zeros(unique_states.shape) else: # get statistical component statistical_ranking = self.statistical_component.rank(msm) # scale the statistical weights statistical_weights = self.statistical_scaling.scale( statistical_ranking) # scale the directed weights directed_weights = self.directed_scaling.scale( self.state_rankings[unique_states]) # determine rankings if self.alpha_percent: total_rankings = (1-self.alpha)*directed_weights + \ self.alpha*statistical_weights else: total_rankings = directed_weights + self.alpha*statistical_weights return total_rankings
[docs]class string(base_ranking): """Uses the string method with MSMs to relax pathway. Samples from states along the highest flux pathway between start-states and end-states. Uses the statistical component to rank the states on this pathway. Parameters ---------- start_states : int or array-like, shape = (n_start_states, ) The starting states for defining the pathway. end_states : int of array-like, shape = (n_end_states, ) The ending states for defining the pathway. statistical_component : ranking function A ranking class object to rank the pathway states. If none is selected, evens is used. maximize_ranking : bool, default=False, Optionally maximize the ranking. This will favor states with high statistical components, i.e. favor states with high counts (unlikely to be desireable). """ def __init__( self, start_states, end_states, statistical_component=None, maximize_ranking=False, **kwargs): self.start_states = start_states self.end_states = end_states self.statistical_component = statistical_component base_ranking.__init__( self, maximize_ranking=maximize_ranking, **kwargs) @property def class_name(self): return "string" @property def config(self): return { 'start_states': self.start_states, 'end_states': self.end_states, 'statistical_component': self.statistical_component, 'maximize_ranking': self.maximize_ranking, }
[docs] def rank(self, msm, unique_states=None): # determine unique states if unique_states is None: unique_states = get_unique_states(msm) if self.statistical_component is None: statistical_ranking = np.zeros(unique_states.shape) else: # get statistical component statistical_ranking = self.statistical_component.rank(msm) # determine the highest flux pathway between states if spar.issparse(msm.tprobs_): tprobs = np.array(msm.tprobs_.todense()) else: tprobs = msm.tprobs_ nfm = enspara.tpt.net_fluxes( tprobs, self.start_states, self.end_states, populations=msm.eq_probs_) path, flux = msmbuilder.tpt.top_path( self.start_states, self.end_states, nfm) # make all non-pathway states `nan` path_states = np.unique(path.flatten()) path_iis = np.array( [ np.where(unique_states == path_state)[0][0] for path_state in path]) non_path_iis = np.setdiff1d(range(len(unique_states)), path_iis) new_rankings = np.array(np.copy(statistical_ranking), dtype=float) new_rankings[non_path_iis] = np.nan return new_rankings