"""
The utils module handles helper routines for equilibrium calculation.
"""
from sympy.utilities.lambdify import lambdify
from sympy import Symbol
import numpy as np
import operator
import functools
import itertools
import collections
from collections.abc import Iterable, Mapping

try:
# Only available in numpy 1.10 and newer
except ImportError:
"Broadcast an array to a desired shape. Returns a view."

[docs]def point_sample(comp_count, pdof=10):
"""
Sample 'pdof * (sum(comp_count) - len(comp_count))' points in
composition space for the sublattice configuration specified
by 'comp_count'. Points are sampled quasi-randomly from a Halton sequence.
A Halton sequence is like a uniform random distribution, but the
result will always be the same for a given 'comp_count' and 'pdof'.
Note: For systems with only one component, only one point will be
returned, regardless of 'pdof'. This is because the degrees of freedom
are zero for that case.

Parameters
----------
comp_count : list
Number of components in each sublattice.
pdof : int
Number of points to sample per degree of freedom.

Returns
-------
ndarray of generated points satisfying the mass balance.

Examples
--------
>>> comps = [8,1] # 8 components in sublattice 1; only 1 in sublattice 2
>>> pts = point_sample(comps, pdof=20) # 7 d.o.f, returns a 140x7 ndarray
"""
# Generate Halton sequence with appropriate dimensions and size
pts = halton(sum(comp_count),
pdof * (sum(comp_count) - len(comp_count)), scramble=True)
# Convert low-discrepancy sequence to normalized exponential
# This will be uniformly distributed over the simplices
pts = -np.log(pts)
cur_idx = 0
for ctx in comp_count:
end_idx = cur_idx + ctx
pts[:, cur_idx:end_idx] /= pts[:, cur_idx:end_idx].sum(axis=1)[:, None]
cur_idx = end_idx

if len(pts) == 0:
pts = np.atleast_2d( * len(comp_count))
return pts

[docs]def make_callable(model, variables, mode=None):
"""
Take a SymPy object and create a callable function.

Parameters
----------
model, SymPy object
Abstract representation of function
variables, list
Input variables, ordered in the way the return function will expect
mode, ['numpy', 'numba', 'sympy'], optional
Method to use when 'compiling' the function. SymPy mode is
slow and should only be used for debugging. If Numba is installed,
it can offer speed-ups when calling the energy function many
times on multi-core CPUs.

Returns
-------
Function that takes arguments in the same order as 'variables'
and returns the energy according to 'model'.

Examples
--------
None yet.
"""
energy = None
if mode is None:
mode = 'numpy'

if mode == 'sympy':
energy = lambda *vs: model.subs(zip(variables, vs)).evalf()
else:
energy = lambdify(tuple(variables), model, dummify=True,
modules=mode)

return energy

[docs]def sizeof_fmt(num, suffix='B'):
"""
Human-readable string for a number of bytes.
"""
for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z']:
if abs(num) < 1000.0:
return "%3.1f%s%s" % (num, unit, suffix)
num /= 1000.0
return "%.1f%s%s" % (num, 'Y', suffix)

[docs]def unpack_condition(tup):
"""
Convert a condition to a list of values.

Notes
-----
Rules for keys of conditions dicts:
(1) If it's numeric, treat as a point value
(2) If it's a tuple with one element, treat as a point value
(3) If it's a tuple with two elements, treat as lower/upper limits and guess a step size.
(4) If it's a tuple with three elements, treat as lower/upper/step
(5) If it's a list, ndarray or other non-tuple ordered iterable, use those values directly.

"""
if isinstance(tup, tuple):
if len(tup) == 1:
return [float(tup)]
elif len(tup) == 2:
return np.arange(tup, tup, dtype=np.float)
elif len(tup) == 3:
return np.arange(tup, tup, tup, dtype=np.float)
else:
raise ValueError('Condition tuple is length {}'.format(len(tup)))
elif isinstance(tup, Iterable):
return [float(x) for x in tup]
else:
return [float(tup)]

[docs]def unpack_phases(phases):
"Convert a phases list/dict into a sorted list."
active_phases = None
if isinstance(phases, (list, tuple, set)):
active_phases = sorted(phases)
elif isinstance(phases, dict):
active_phases = sorted([phn for phn, status in phases.items() \
if status == 'entered'])
elif type(phases) is str:
active_phases = [phases]
return active_phases

[docs]def generate_dof(phase, active_comps):
"""
Accept a Phase object and a set() of the active components.
Return a tuple of variable names and the sublattice degrees of freedom.
"""
variables = []
sublattice_dof = []
for idx, sublattice in enumerate(phase.constituents):
dof = 0
for component in sorted(set(sublattice).intersection(active_comps)):
variables.append(v.SiteFraction(phase.name.upper(), idx, component))
dof += 1
sublattice_dof.append(dof)
return variables, sublattice_dof

[docs]def endmember_matrix(dof, vacancy_indices=None):
"""
Accept the number of components in each sublattice.
Return a matrix corresponding to the compositions of all endmembers.

Parameters
----------
dof : list of int
Number of components in each sublattice.
vacancy_indices, list of list of int, optional
If vacancies are present in every sublattice, specify their indices
in each sublattice to ensure the "pure vacancy" endmembers are excluded.

Examples
--------
Sublattice configuration like: (AL, NI, VA):(AL, NI, VA):(VA)
>>> endmember_matrix([3,3,1], vacancy_indices=[, , ])
"""
total_endmembers = functools.reduce(operator.mul, dof, 1)
res_matrix = np.empty((total_endmembers, sum(dof)), dtype=np.float)
dof_arrays = [np.eye(d).tolist() for d in dof]
row_idx = 0
for row in itertools.product(*dof_arrays):
res_matrix[row_idx, :] = np.concatenate(row, axis=0)
row_idx += 1
if vacancy_indices is not None and len(vacancy_indices) == len(dof):
dof_adj = np.array([sum(dof[0:i]) for i in range(len(dof))])
for vacancy_em in itertools.product(*vacancy_indices):
row_idx_to_delete = np.where(np.all(res_matrix[:, indices] == 1,
axis=1))
res_matrix = np.delete(res_matrix, (row_idx_to_delete), axis=0)
# Adjust site fractions to the numerical limit
cur_idx = 0
res_matrix[res_matrix == 0] = MIN_SITE_FRACTION
for ctx in dof:
end_idx = cur_idx + ctx
res_matrix[:, cur_idx:end_idx] /= res_matrix[:, cur_idx:end_idx].sum(axis=1)[:, None]
cur_idx = end_idx
return res_matrix

[docs]def unpack_kwarg(kwarg_obj, default_arg=None):
"""
Keyword arguments in pycalphad can be passed as a constant value, a
dict of phase names and values, or a list containing both of these. If
the latter, then the dict is checked first; if the phase of interest is not
there, then the constant value is used.

This function is a way to construct defaultdicts out of keyword arguments.

Parameters
----------
kwarg_obj : dict, iterable, or None
Argument to unpack into a defaultdict
default_arg : object
Default value to use if iterable isn't specified

Returns
-------
defaultdict for the keyword argument of interest

Examples
--------
>>> test_func = lambda **kwargs: print(unpack_kwarg('opt'))
>>> test_func(opt=100)
>>> test_func(opt={'FCC_A1': 50, 'BCC_B2': 10})
>>> test_func(opt=[{'FCC_A1': 30}, 200])
>>> test_func()
>>> test_func2 = lambda **kwargs: print(unpack_kwarg('opt', default_arg=1))
>>> test_func2()
"""
new_dict = collections.defaultdict(lambda: default_arg)

if isinstance(kwarg_obj, Mapping):
new_dict.update(kwarg_obj)
# kwarg_obj is a list containing a dict and a default
# For now at least, we don't treat ndarrays the same as other iterables
# ndarrays are assumed to be numeric arrays containing "default values", so don't match here
elif isinstance(kwarg_obj, Iterable) and not isinstance(kwarg_obj, np.ndarray):
for element in kwarg_obj:
if isinstance(element, Mapping):
new_dict.update(element)
else:
# element=element syntax to silence var-from-loop warning
new_dict = collections.defaultdict(
lambda element=element: element, new_dict)
elif kwarg_obj is None:
pass
else:
new_dict = collections.defaultdict(lambda: kwarg_obj)

return new_dict

[docs]def unpack_components(dbf, comps):
"""

Parameters
----------
dbf : Database
Thermodynamic database containing elements and species.
comps : list
Names of components to consider in the calculation.

Returns
-------
set
Set of Species objects
"""
# Constrain possible components to those within phase's d.o.f
# Assume for the moment that comps contains a list of pure element strings
# We want to add all the species which can be created by a combination of
# the user-specified pure elements
species_dict = {s.name: s for s in dbf.species}
possible_comps = {v.Species(species_dict.get(x, x)) for x in comps}
desired_active_pure_elements = [list(x.constituents.keys()) for x in possible_comps]
# Flatten nested list
desired_active_pure_elements = [el.upper() for constituents in desired_active_pure_elements for el in constituents]
eligible_species_from_database = {x for x in dbf.species if
set(x.constituents.keys()).issubset(desired_active_pure_elements)}
return eligible_species_from_database

[docs]def get_pure_elements(dbf, comps):
"""
Return a list of pure elements in the system.

Parameters
----------
dbf : Database
A Database object
comps : list
A list of component names (species and pure elements)

Returns
-------
list
A list of pure elements in the Database
"""
comps = sorted(unpack_components(dbf, comps))
components = [x for x in comps]
desired_active_pure_elements = [list(x.constituents.keys()) for x in components]
desired_active_pure_elements = [el.upper() for constituents in desired_active_pure_elements for el in constituents]
pure_elements = sorted(set([x for x in desired_active_pure_elements if x != 'VA']))
return pure_elements

[docs]def filter_phases(dbf, comps, candidate_phases=None):
"""Return phases that are valid for equilibrium calculations for the given database and components

Filters out phases that
* Have no active components in any sublattice of a phase
* Are disordered phases in an order-disorder model

Parameters
----------
dbf : Database
Thermodynamic database containing the relevant parameters.
comps : list of v.Species
Species to consider in the calculation.
candidate_phases : list
Names of phases to consider in the calculation, if not passed all phases from DBF will be considered
Returns
-------
list
Sorted list of phases that are valid for the Database and components
"""
# TODO: filter phases that can not charge balance

def all_sublattices_active(comps, phase):
active_sublattices = [len(set(comps).intersection(subl)) > 0 for
subl in phase.constituents]
return all(active_sublattices)
if candidate_phases == None:
candidate_phases = dbf.phases.keys()
else:
candidate_phases = set(candidate_phases).intersection(dbf.phases.keys())
disordered_phases = [dbf.phases[phase].model_hints.get('disordered_phase') for phase in candidate_phases]
phases = [phase for phase in candidate_phases if
all_sublattices_active(comps, dbf.phases[phase]) and
(phase not in disordered_phases or (phase in disordered_phases and
dbf.phases[phase].model_hints.get('ordered_phase') not in candidate_phases))]
return sorted(phases)

[docs]def extract_parameters(parameters):
"""
Extract symbols and values from parameters.

Parameters
----------
parameters : dict
Dictionary of parameters

Returns
-------
tuple
Tuple of parameter symbols and parameter values
"""
if len(parameters) > 0:
param_symbols, param_values = zip(*[(wrap_symbol(key), val) for key, val in sorted(parameters.items(),
key=operator.itemgetter(0))])
param_values = np.asarray(param_values, dtype=np.float64)
else:
param_symbols = []
param_values = np.empty(0)
return param_symbols, param_values

[docs]def instantiate_models(dbf, comps, phases, model=None, parameters=None, symbols_only=True):
"""

Parameters
----------
dbf : Database
Database used to construct the Model instances.
comps : Iterable
Names of components to consider in the calculation.
phases : Iterable
Names of phases to consider in the calculation.
model : Model class, a dict of phase names to Model, or a Iterable of both
Model class to use for each phase.
parameters : dict, optional
Maps SymPy Symbol to numbers, for overriding the values of parameters in
the Database.
symbols_only : bool
If True, symbols will be extracted from the parameters dict and used to
construct the Model instances.

Returns
-------
dict
Dictionary of Model instances corresponding to the passed phases.
"""
from pycalphad import Model  # avoid cyclic imports
parameters = parameters if parameters is not None else {}
if symbols_only:
parameters, _ = extract_parameters(parameters)
if isinstance(model, Model):  # Check that this instance is compatible with phases
if len(phases) > 1:
raise ValueError("Cannot instantiate models for multiple phases ({}) using a Model instance ({}, phase: {})".format(phases, model, model.phase_name))
else:
if phases != model.phase_name:
raise ValueError("Cannot instantiate models because the desired {} phase does not match the Model instance () {} phase.".format(phases, model.phase_name, model))
models_defaultdict = unpack_kwarg(model, Model)
models_dict = {}
for name in phases:
mod = models_defaultdict[name]
if isinstance(mod, type):
models_dict[name] = mod(dbf, comps, name, parameters=parameters)
else:
models_dict[name] = mod
return models_dict

[docs]def get_state_variables(models=None, conds=None):
"""
Return a set of StateVariables defined Model instances and/or conditions.

Parameters
----------
models : dict, optional
Dictionary mapping phase names to instances of Model objects
conds : dict, optional
Dictionary mapping pycalphad StateVariables to values

Returns
-------
set
State variables that are defined in the models and or conditions.

Examples
--------
>>> from pycalphad import variables as v
>>> get_state_variables(conds={v.P: 101325, v.N: 1, v.X('AL'): 0.2}) == {v.P, v.N, v.T}
True
"""
state_vars = set()
if models is not None:
for mod in models.values():
state_vars.update(mod.state_variables)
if conds is not None:
for c in conds.keys():
# StateVariable instances are ok (e.g. P, T, N, V, S),
# however, subclasses (X, Y, MU, NP) are not ok.
if type(c) is v.StateVariable:
return state_vars

[docs]def wrap_symbol(obj):
if isinstance(obj, Symbol):
return obj
else:
return Symbol(obj)

[docs]def wrap_symbol_symengine(obj):
from symengine import Symbol, sympify
from sympy import Symbol as Symbol_sympy
if isinstance(obj, Symbol):
return obj
elif isinstance(obj, Symbol_sympy):
return sympify(obj)
else:
return Symbol(obj)