Source code for pycalphad.fitting

"""
This module contains routines for fitting new CALPHAD models.
"""

import numpy as np
import xarray
from pycalphad import calculate, equilibrium, Model, Database
from pycalphad.core.utils import make_callable, generate_dof
import pycalphad.variables as v
from sympy import Symbol
import matplotlib.pyplot as plt
import itertools
import functools
from collections import OrderedDict, namedtuple
import json


[docs]def setup_dataset(file_obj, dbf, params, mode=None): # params should be a list of pymc variables corresponding to parameters data = json.load(file_obj) if data['solver']['mode'] != 'manual': raise NotImplemented fit_models = {name: Model(dbf, data['components'], name) for name in data['phases']} param_vars = [] for key, mod in fit_models.items(): param_vars.extend(sorted(set(mod.ast.atoms(Symbol)) - set(mod.variables), key=str)) param_vars = sorted(param_vars, key=str) if len(params) != len(param_vars): raise ValueError('Input parameter vector length doesn\'t match the free parameters' ' in the phase models: {0} != {1}'.format(len(params), len(param_vars))) indep_vars = [v.P, v.T] site_fracs = {key: sorted(mod.ast.atoms(v.SiteFraction), key=str) for key, mod in fit_models.items()} # Call this from a routine that pulls in all datasets and generates the variance vars + Potentials callables = {name: make_callable(getattr(mod, data['output']), itertools.chain(param_vars, indep_vars, site_fracs[name]), mode=mode) for name, mod in fit_models.items()} extra_conds = OrderedDict({key: np.atleast_1d(value) for key, value in data['conditions'].items()}) exp_values = xarray.DataArray(np.array(data['values'], dtype=np.float), dims=list(extra_conds.keys())+['points'], coords=extra_conds) def compute_error(*args): prefill_callables = {key: functools.partial(*itertools.chain([func], args[:len(params)])) for key, func in callables.items()} result = calculate(dbf, data['components'], data['phases'], output=data['output'], points=np.atleast_2d(data['solver']['sublattice_configuration']).astype(np.float), callables=prefill_callables, model=fit_models, **extra_conds) # Eliminate data below 300 K for now error = (result[data['output']] - exp_values).sel(T=slice(300, None)).values.flatten() return error def compute_values(*args): prefill_callables = {key: functools.partial(*itertools.chain([func], args[:len(params)])) for key, func in callables.items()} result = calculate(dbf, data['components'], data['phases'], output=data['output'], points=np.atleast_2d(data['solver']['sublattice_configuration']).astype(np.float), callables=prefill_callables, model=fit_models, **extra_conds) return result return compute_error, compute_values, exp_values, data
Dataset = namedtuple('Dataset', ['error_func', 'calc_func', 'exp_data', 'json'])
[docs]def build_pymc_model(dbf, dataset_names, params): """ Build a pymc (2.x) Model using the specified data and parameters. Parameters ---------- dbf : Database Database with defined parameters, except for params. dataset_names : list of str List of paths to pycalphad JSON files. params : list of pymc.Stochastic Parameters to fit. Returns ------- (pymc.Model, dict of Dataset) """ # TODO: Figure out a better solution than hiding an import here import pymc params = sorted(params, key=str) dataset_error_funcs = [] dataset_est_variances = [] datasets = [] for idx, fname in enumerate(dataset_names): with open(fname) as file_: datasets.append(Dataset(*setup_dataset(file_, dbf, params))) dataset_est_variances.append(np.var(datasets[-1].exp_data.values)) dataset_error_funcs.append(datasets[-1].error_func) # For 1/var**2, choose gamma(alpha=10*est_var, beta=10) #dataset_variance = pymc.Gamma('dataset_variance', # alpha=np.asarray(dataset_est_variances, dtype=np.float), # beta=np.asarray(dataset_est_variances, dtype=np.float), # size=len(dataset_names)) function_namespace = {'zeros_like': np.zeros_like, 'square': np.square, 'divide': np.divide, 'dataset_est_variances': dataset_est_variances, 'dataset_names': dataset_names} # TODO: Is there a security issue with passing the output of str(x) to exec? function_namespace.update([(str(param), param) for param in params]) param_kwarg_names = ','.join([str(param) + '=' + str(param) for param in params]) param_arg_names = ','.join([str(param) for param in params]) function_namespace.update({'dataset_error_funcs': dataset_error_funcs}) # Now we have to do some metaprogramming to get the variable names to bind properly # This code doesn't yet allow custom distributions for the error error_func_code = """def error({0}): result = zeros_like(dataset_names, dtype='float') for idx in range(len(dataset_names)): result[idx] = divide(square(dataset_error_funcs[idx]({1})).mean(), dataset_est_variances[idx]) return -result.sum()""".format(param_kwarg_names, param_arg_names) error_func_code = compile(error_func_code, '<string>', 'exec') exec(error_func_code, function_namespace) error = pymc.potential(function_namespace['error']) mod = pymc.Model([function_namespace[str(param)] for param in params] + [error]) return mod, datasets
def _map_internal_dof(input_database, components, phase_name, points): """ Map matrix of internal degrees of freedom to global compositions. """ # Map the internal degrees of freedom to global coordinates # Normalize site ratios by the sum of site ratios times a factor # related to the site fraction of vacancies phase_obj = input_database.phases[phase_name] site_ratio_normalization = np.zeros(points.shape[:-1]) phase_compositions = np.empty(points.shape[:-1] + (len(components),)) variables, sublattice_dof = generate_dof(phase_obj, components) for idx, sublattice in enumerate(phase_obj.constituents): vacancy_column = np.ones(points.shape[:-1]) if 'VA' in set(sublattice): var_idx = variables.index(v.SiteFraction(phase_obj.name, idx, 'VA')) vacancy_column -= points[..., :, var_idx] site_ratio_normalization += phase_obj.sublattices[idx] * vacancy_column for col, comp in enumerate(components): avector = [float(vxx.species == comp) * phase_obj.sublattices[vxx.sublattice_index] for vxx in variables] phase_compositions[..., :, col] = np.divide(np.dot(points[..., :, :], avector), site_ratio_normalization) return phase_compositions
[docs]def plot_results(input_database, datasets, params, databases=None): """ Generate figures using the datasets and trace of the parameters. A dict of label->Database objects may be provided as a kwarg. """ # Add extra broadcast dimensions for T, P, and 'points' param_tr = [i[None, None, None].T for i in params.values()] def plot_key(obj): plot = obj.json.get('plot', None) return (plot['x'], plot['y']) if plot else None datasets = sorted(datasets, key=plot_key) databases = dict() if databases is None else databases for plot_data_type, data_group in itertools.groupby(datasets, key=plot_key): if plot_data_type is None: continue figure = plt.figure(figsize=(15, 12)) data_group = list(data_group) x, y = plot_data_type # All of data_group should be calculating the same thing... # Don't show fits below 300 K since they're currently meaningless # TODO: Calls to flatten() should actually be slicing operations # We can get away with it for now since all datasets will be 2D fit = data_group[0].calc_func(*param_tr).sel(T=slice(300, None)) mu = fit[y].values.mean(axis=0).flatten() sigma = 2 * fit[y].values.std(axis=0).flatten() figure.gca().plot(fit[x].values.flatten(), mu, '-k', label='This work') figure.gca().fill_between(fit[x].values.flatten(), mu - sigma, mu + sigma, color='lightgray') for data in data_group: plot_label = data.json['plot'].get('name', None) figure.gca().plot(data.exp_data[x].values, data.exp_data.values.flatten(), label=plot_label) for label, dbf in databases.items(): # TODO: Relax this restriction if data_group[0].json['solver']['mode'] != 'manual': continue conds = data_group[0].json['conditions'] conds['T'] = np.array(conds['T']) conds['T'] = conds['T'][conds['T'] >= 300.] for key in conds.keys(): if key not in ['T', 'P']: raise ValueError('Invalid conditions in JSON file') # To work around differences in sublattice models, relax the internal dof global_comps = sorted(set(data_group[0].json['components']) - set(['VA'])) compositions = \ _map_internal_dof(input_database, sorted(data_group[0].json['components']), data_group[0].json['phases'][0], np.atleast_2d( data_group[0].json['solver']['sublattice_configuration']).astype(np.float)) # Tiny perturbation to work around a bug in lower_convex_hull (gh-28) compare_conds = {v.X(comp): np.add(compositions[:, idx], 1e-4).flatten().tolist() for idx, comp in enumerate(global_comps[:-1])} compare_conds.update({v.__dict__[key]: value for key, value in conds.items()}) # We only want to relax the internal dof at the lowest temperature # This will help us capture the most related sublattice config since solver mode=manual # probably means this is first-principles data compare_conds[v.T] = 300. eqres = equilibrium(dbf, data_group[0].json['components'], str(data_group[0].json['phases'][0]), compare_conds, verbose=False) internal_dof = sum(map(len, dbf.phases[data_group[0].json['phases'][0]].constituents)) largest_phase_fraction = eqres['NP'].values.argmax() eqpoints = eqres['Y'].values[..., largest_phase_fraction, :internal_dof] result = calculate(dbf, data_group[0].json['components'], str(data_group[0].json['phases'][0]), output=y, points=eqpoints, **conds) # Don't show CALPHAD results below 300 K because they're meaningless right now result = result.sel(T=slice(300, None)) figure.gca().plot(result[x].values.flatten(), result[y].values.flatten(), label=label) label_mapping = dict(x=x, y=y) label_mapping['CPM'] = 'Molar Heat Capacity (J/mol-atom-K)' label_mapping['SM'] = 'Molar Entropy (J/mol-atom-K)' label_mapping['HM'] = 'Molar Enthalpy (J/mol-atom)' label_mapping['T'] = 'Temperature (K)' label_mapping['P'] = 'Pressure (Pa)' figure.gca().set_xlabel(label_mapping[x], fontsize=20) figure.gca().set_ylabel(label_mapping[y], fontsize=20) figure.gca().tick_params(axis='both', which='major', labelsize=20) figure.gca().legend(loc='best', fontsize=16) figure.canvas.draw() yield figure plt.show()