Source code for pycalphad.core.equilibrium

"""
The equilibrium module defines routines for interacting with
calculated phase equilibria.
"""
import warnings
from collections import OrderedDict
from collections.abc import Mapping
from datetime import datetime
import pycalphad.variables as v
from pycalphad.core.utils import unpack_components, unpack_condition, unpack_phases, filter_phases, instantiate_models, get_state_variables
from pycalphad import calculate
from pycalphad.core.errors import EquilibriumError, ConditionError
from pycalphad.core.starting_point import starting_point
from pycalphad.codegen.callables import build_phase_records
from pycalphad.core.eqsolver import _solve_eq_at_conditions
from pycalphad.core.phase_rec import PhaseRecord
from pycalphad.core.solver import Solver
from pycalphad.core.light_dataset import LightDataset
from pycalphad.model import Model
import numpy as np


def _adjust_conditions(conds):
    "Adjust conditions values to be within the numerical limit of the solver."
    new_conds = OrderedDict()
    minimum_composition = 1e-10
    for key, value in sorted(conds.items(), key=str):
        if key == str(key):
            key = getattr(v, key, key)
        if isinstance(key, v.MoleFraction):
            vals = unpack_condition(value)
            # "Zero" composition is a common pattern. Do not warn for that case.
            if np.any(np.logical_and(np.asarray(vals) < minimum_composition, np.asarray(vals) > 0)):
                warnings.warn(
                    f"Some specified compositions are below the minimum allowed composition of {minimum_composition}.")
            new_conds[key] = [max(val, minimum_composition) for val in vals]
        else:
            new_conds[key] = unpack_condition(value)
    return new_conds


def _eqcalculate(dbf, comps, phases, conditions, output, data=None, per_phase=False, callables=None, model=None,
                 parameters=None, **kwargs):
    """
    WARNING: API/calling convention not finalized.
    Compute the *equilibrium value* of a property.
    This function differs from `calculate` in that it computes
    thermodynamic equilibrium instead of randomly sampling the
    internal degrees of freedom of a phase.
    Because of that, it's slower than `calculate`.
    This plugs in the equilibrium phase and site fractions
    to compute a thermodynamic property defined in a Model.

    Parameters
    ----------
    dbf : Database
        Thermodynamic database containing the relevant parameters.
    comps : list
        Names of components to consider in the calculation.
    phases : list or dict
        Names of phases to consider in the calculation.
    conditions : dict or (list of dict)
        StateVariables and their corresponding value.
    output : str
        Equilibrium model property (e.g., CPM, HM, etc.) to compute.
        This must be defined as an attribute in the Model class of each phase.
    data : Dataset
        Previous result of call to `equilibrium`.
        Should contain the equilibrium configurations at the conditions of interest.
        If the databases are not the same as in the original calculation,
        the results may be meaningless.
    per_phase : bool, optional
        If True, compute and return the property for each phase present.
        If False, return the total system value, weighted by the phase fractions.
    callables : dict
        Callable functions to compute 'output' for each phase.
    model : a dict of phase names to Model
        Model class to use for each phase.
    parameters : dict, optional
        Maps SymEngine Symbol to numbers, for overriding the values of parameters in the Database.
    kwargs
        Passed to `calculate`.

    Returns
    -------
    Dataset of property as a function of equilibrium conditions
    """
    if data is None:
        raise ValueError('Required kwarg "data" is not specified')
    if model is None:
        raise ValueError('Required kwarg "model" is not specified')
    active_phases = unpack_phases(phases)
    conds = _adjust_conditions(conditions)
    indep_vars = ['N', 'P', 'T']
    # TODO: Rewrite this to use the coord dict from 'data'
    str_conds = OrderedDict((str(key), value) for key, value in conds.items())
    indep_vals = list([float(x) for x in np.atleast_1d(val)]
                      for key, val in str_conds.items() if key in indep_vars)
    coord_dict = str_conds.copy()
    components = [x for x in sorted(comps)]
    desired_active_pure_elements = [list(x.constituents.keys()) for x in components]
    desired_active_pure_elements = [el.upper() for constituents in desired_active_pure_elements for el in constituents]
    pure_elements = sorted(set([x for x in desired_active_pure_elements if x != 'VA']))
    coord_dict['vertex'] = np.arange(len(pure_elements) + 1)  # +1 is to accommodate the degenerate degree of freedom at the invariant reactions
    grid_shape = np.meshgrid(*coord_dict.values(),
                             indexing='ij', sparse=False)[0].shape
    prop_shape = grid_shape
    prop_dims = list(str_conds.keys()) + ['vertex']

    result = LightDataset({output: (prop_dims, np.full(prop_shape, np.nan))}, coords=coord_dict)
    # For each phase select all conditions where that phase exists
    # Perform the appropriate calculation and then write the result back
    for phase in active_phases:
        dof = len(model[phase].site_fractions)
        current_phase_indices = (data.Phase == phase)
        if ~np.any(current_phase_indices):
            continue
        points = data.Y[np.nonzero(current_phase_indices)][..., :dof]
        statevar_indices = np.nonzero(current_phase_indices)[:len(indep_vals)]
        statevars = {key: np.take(np.asarray(vals), idx)
                     for key, vals, idx in zip(indep_vars, indep_vals, statevar_indices)}
        statevars.update(kwargs)
        if statevars.get('mode', None) is None:
            statevars['mode'] = 'numpy'
        calcres = calculate(dbf, comps, [phase], output=output, points=points, broadcast=False,
                            callables=callables, parameters=parameters, model=model, **statevars)
        result[output][np.nonzero(current_phase_indices)] = calcres[output].values
    if not per_phase:
        out = np.nansum(result[output] * data['NP'], axis=-1)
        dv_output = result.data_vars[output]
        result.remove(output)
        # remove the vertex coordinate because we summed over it
        result.add_variable(output, dv_output[0][:-1], out)
    else:
        dv_phase = data.data_vars['Phase']
        dv_np = data.data_vars['NP']
        result.add_variable('Phase', dv_phase[0], dv_phase[1])
        result.add_variable('NP', dv_np[0], dv_np[1])
    return result


[docs] def equilibrium(dbf, comps, phases, conditions, output=None, model=None, verbose=False, broadcast=True, calc_opts=None, to_xarray=True, scheduler='sync', parameters=None, solver=None, callables=None, phase_records=None, **kwargs): """ Calculate the equilibrium state of a system containing the specified components and phases, under the specified conditions. Parameters ---------- dbf : Database Thermodynamic database containing the relevant parameters. comps : list Names of components to consider in the calculation. phases : list or dict Names of phases to consider in the calculation. conditions : dict or (list of dict) StateVariables and their corresponding value. output : str or list of str, optional Additional equilibrium model properties (e.g., CPM, HM, etc.) to compute. These must be defined as attributes in the Model class of each phase. model : Model, a dict of phase names to Model, or a seq of both, optional Model class to use for each phase. verbose : bool, optional Print details of calculations. Useful for debugging. broadcast : bool If True, broadcast conditions against each other. This will compute all combinations. If False, each condition should be an equal-length list (or single-valued). Disabling broadcasting is useful for calculating equilibrium at selected conditions, when those conditions don't comprise a grid. calc_opts : dict, optional Keyword arguments to pass to `calculate`, the energy/property calculation routine. to_xarray : bool Whether to return an xarray Dataset (True, default) or an EquilibriumResult. scheduler : Dask scheduler, optional Job scheduler for performing the computation. If None, return a Dask graph of the computation instead of actually doing it. parameters : dict, optional Maps SymEngine Symbol to numbers, for overriding the values of parameters in the Database. solver : pycalphad.core.solver.SolverBase Instance of a solver that is used to calculate local equilibria. Defaults to a pycalphad.core.solver.Solver. callables : dict, optional Pre-computed callable functions for equilibrium calculation. phase_records : Optional[Mapping[str, PhaseRecord]] Mapping of phase names to PhaseRecord objects with `'GM'` output. Must include all active phases. The `model` argument must be a mapping of phase names to instances of Model objects. Returns ------- Structured equilibrium calculation, or Dask graph if scheduler=None. Examples -------- None yet. """ if not broadcast: raise NotImplementedError('Broadcasting cannot yet be disabled') comps = sorted(unpack_components(dbf, comps)) phases = unpack_phases(phases) or sorted(dbf.phases.keys()) list_of_possible_phases = filter_phases(dbf, comps) if len(list_of_possible_phases) == 0: raise ConditionError('There are no phases in the Database that can be active with components {0}'.format(comps)) active_phases = filter_phases(dbf, comps, phases) if len(active_phases) == 0: raise ConditionError('None of the passed phases ({0}) are active. List of possible phases: {1}.'.format(phases, list_of_possible_phases)) if isinstance(comps, (str, v.Species)): comps = [comps] if len(set(comps) - set(dbf.species)) > 0: raise EquilibriumError('Components not found in database: {}' .format(','.join([c.name for c in (set(comps) - set(dbf.species))]))) calc_opts = calc_opts if calc_opts is not None else dict() solver = solver if solver is not None else Solver(verbose=verbose) parameters = parameters if parameters is not None else dict() if isinstance(parameters, dict): parameters = OrderedDict(sorted(parameters.items(), key=str)) # Temporary solution until constraint system improves if conditions.get(v.N) is None: conditions[v.N] = 1 if np.any(np.array(conditions[v.N]) != 1): raise ConditionError('N!=1 is not yet supported, got N={}'.format(conditions[v.N])) # Modify conditions values to be within numerical limits, e.g., X(AL)=0 # Also wrap single-valued conditions with lists conds = _adjust_conditions(conditions) for cond in conds.keys(): if isinstance(cond, (v.MoleFraction, v.ChemicalPotential)) and cond.species not in comps: raise ConditionError('{} refers to non-existent component'.format(cond)) str_conds = OrderedDict((str(key), value) for key, value in conds.items()) components = [x for x in sorted(comps)] desired_active_pure_elements = [list(x.constituents.keys()) for x in components] desired_active_pure_elements = [el.upper() for constituents in desired_active_pure_elements for el in constituents] pure_elements = sorted(set([x for x in desired_active_pure_elements if x != 'VA'])) if verbose: print('Components:', ' '.join([str(x) for x in comps])) print('Phases:', end=' ') output = output if output is not None else 'GM' output = output if isinstance(output, (list, tuple, set)) else [output] output = set(output) output |= {'GM'} output = sorted(output) if phase_records is None: models = instantiate_models(dbf, comps, active_phases, model=model, parameters=parameters) phase_records = build_phase_records(dbf, comps, active_phases, conds, models, output='GM', callables=callables, parameters=parameters, verbose=verbose, build_gradients=True, build_hessians=True) else: # phase_records were provided, instantiated models must also be provided by the caller models = model if not isinstance(models, Mapping): raise ValueError("A dictionary of instantiated models must be passed to `equilibrium` with the `model` argument if the `phase_records` argument is used.") active_phases_without_models = [name for name in active_phases if not isinstance(models.get(name), Model)] active_phases_without_phase_records = [name for name in active_phases if not isinstance(phase_records.get(name), PhaseRecord)] if len(active_phases_without_phase_records) > 0: raise ValueError(f"phase_records must contain a PhaseRecord instance for every active phase. Missing PhaseRecord objects for {sorted(active_phases_without_phase_records)}") if len(active_phases_without_models) > 0: raise ValueError(f"model must contain a Model instance for every active phase. Missing Model objects for {sorted(active_phases_without_models)}") if verbose: print('[done]', end='\n') state_variables = sorted(get_state_variables(models=models, conds=conds), key=str) # 'calculate' accepts conditions through its keyword arguments grid_opts = calc_opts.copy() statevar_strings = [str(x) for x in state_variables] grid_opts.update({key: value for key, value in str_conds.items() if key in statevar_strings}) if 'pdens' not in grid_opts: grid_opts['pdens'] = 60 grid = calculate(dbf, comps, active_phases, model=models, fake_points=True, phase_records=phase_records, output='GM', parameters=parameters, to_xarray=False, **grid_opts) coord_dict = str_conds.copy() coord_dict['vertex'] = np.arange(len(pure_elements) + 1) # +1 is to accommodate the degenerate degree of freedom at the invariant reactions coord_dict['component'] = pure_elements properties = starting_point(conds, state_variables, phase_records, grid) properties = _solve_eq_at_conditions(properties, phase_records, grid, list(str_conds.keys()), state_variables, verbose, solver=solver) # Compute equilibrium values of any additional user-specified properties # We already computed these properties so don't recompute them output = sorted(set(output) - {'GM', 'MU'}) for out in output: if (out is None) or (len(out) == 0): continue # TODO: How do we know if a specified property should be per_phase or not? # For now, we make a best guess if (out == 'degree_of_ordering') or (out == 'DOO'): per_phase = True else: per_phase = False eqcal = _eqcalculate(dbf, comps, active_phases, conditions, out, data=properties, per_phase=per_phase, model=models, callables=callables, parameters=parameters, **calc_opts) properties = properties.merge(eqcal, inplace=True, compat='equals') if to_xarray: properties = properties.get_dataset() properties.attrs['created'] = datetime.utcnow().isoformat() if len(kwargs) > 0: warnings.warn('The following equilibrium keyword arguments were passed, but unused:\n{}'.format(kwargs)) return properties