Source code for pycalphad.mapping.zpf_checks

from typing import Mapping
import logging

import numpy as np

from pycalphad import variables as v
from pycalphad.core.constants import COMP_DIFFERENCE_TOL
from pycalphad.core.composition_set import CompositionSet

from pycalphad.mapping.primitives import ZPFLine, Point, ZPFState
import pycalphad.mapping.zpf_equilibrium as zeq

_log = logging.getLogger(__name__)

"""
Two types of checking functions in this module

Simple checks
    simple_check_valid_point
    simple_check_change_in_phases
    simple_check_global_min

    These quickly check a step result and returns 
    a bool whether the step result is valid or not

    These are intended for exit/direction finding, where we just need to know if 
    an exit/direction is valid

Normal checks
    check_valid_point
    check_axis_values
    check_change_in_phases
    check_global_min
    check_similar_phase_composition

    These will check whether the step result is valid or
    if not valid, then will modify the zpf line or create
    a new node
"""

[docs] def simple_check_valid_point(step_results: tuple[Point, list[CompositionSet]], **kwargs): """ Returns True or False for whether step result was successful equilibria Parameters ---------- step_results : [Point, [CompositionSet]] Results from zpf_equilibrium.update_equilibrium_with_new_conditions Returns ------- bool whether step result is valid or not (None) """ if step_results is None: _log.info("Invalid equilibrium results") return step_results is not None
[docs] def simple_check_change_in_phases(step_results: tuple[Point, list[CompositionSet]], **kwargs): """ Returns True or False for whether step result resulted in same number of phases Parameters ---------- step_results : [Point, [CompositionSet]] Results from zpf_equilibrium.update_equilibrium_with_new_conditions Returns ------- bool whether step result resulted in a phase change """ if step_results is None: return False new_point, orig_cs = step_results num_different_phases = len(set(orig_cs).symmetric_difference(set(new_point.stable_composition_sets))) if num_different_phases != 0: _log.info(f"Number of stable phases changed from {[cs.phase_record.phase_name for cs in orig_cs]} to {new_point.stable_phases}") return num_different_phases == 0
[docs] def simple_check_global_min(step_results: tuple[Point, list[CompositionSet]], **kwargs): """ Returns True or False for whether step result is still global min Parameters ---------- step_results : [Point, [CompositionSet]] Results from zpf_equilibrium.update_equilibrium_with_new_conditions Returns ------- bool whether step result is global min or not """ if step_results is None: return False pdens = kwargs.get("pdens", 500) tol = kwargs.get("tol", 1e-4) system_info = kwargs.get("system_info", None) num_candidates = kwargs.get("global_num_candidates", 1) new_point, orig_cs = step_results global_test_point = zeq.find_global_min_point(new_point, system_info, pdens, tol, num_candidates) if global_test_point is not None: _log.info(f"Point is not global minimum. Current CS: {new_point.stable_phases}, new CS: {global_test_point.stable_phases}") return global_test_point is None
[docs] def check_valid_point(zpf_line: ZPFLine, step_results: tuple[Point, list[CompositionSet]], axis_data: Mapping, **kwargs): """ 3 possible outcomes a) Converged equilibrium -> pass b) Failed equilibrium, reduce axis delta -> don"t add point and attempt stepping again c) Failed equilibrium and axis delta reached minimum -> end zpf line with unexpected ending Parameters ---------- zpf_line : ZPFLine ZPFLine that the point is stepping in step_results : [Point, [CompositionSet]] Results from zpf_equilibrium.update_equilibrium_with_new_conditions axis_data : dict Axis variable data from a map strategy class Returns ------- None - this check does not produce any new nodes However, this will end zpf_line if step_result is invalid """ axis_vars, axis_delta, axis_lims = axis_data["axis_vars"], axis_data["axis_delta"], axis_data["axis_lims"] delta_scale = kwargs.get("delta_scale", 0.5) min_delta_ratio = kwargs.get("min_delta_ratio", 0.1) if step_results is None: _log.info(f"Invalid equilibrium result, reducing step size from {zpf_line.current_delta} -> {zpf_line.current_delta*delta_scale}") zpf_line.current_delta *= delta_scale if zpf_line.current_delta / axis_delta[zpf_line.axis_var] < min_delta_ratio: _log.info(f"Step size has reached minimum {zpf_line.current_delta}/{axis_delta[zpf_line.axis_var]} = {min_delta_ratio}. Failing ZPF line") zpf_line.status = ZPFState.FAILED return None zpf_line.status = ZPFState.ATTEMPT_NEW_STEP return None
def _check_axis_values_within_limit(new_point_vars: dict[v.StateVariable, float], axis_data: Mapping): """ Checks that axis values are within the axis limits NOTE: I think prev_point_vars (unused) and new_point_vars are called this rather than conditions since they are taken from v.StateVariable.compute_property rather than from the global conditions NOTE: There were some issues before with X-C systems where a zpf line containing graphite and another stoichiometric phase would stop midway due to the composition of graphite being at X(C)=1 Adding an offset to slightly extend the axis limits seems to help this issue. Local tests with the Bengt mpea-05 database on all X-C systems (X = Al, Co, Cr, Fe, Ni, Mn) appears to have to have all zpf lines correctly present I will also justify this offset with the following: 1) Stepping will automatically step the stepping variable to be within axis limits 2) For stoichiometric phases at an endmember, finite precision may cause the composition to be slightly above/below 1 (within numerical precision) 3) For non-stoichiometric phases, composition will be limited between 0 and 1 due to the constraints in the minimizer Parameters ---------- prev_point_vars : dict[v.StateVariable, float] Conditions of last point in zpf line new_point_vars : dict[v.StateVariable, float] Conditions of new point axis_data : dict Axis variable information from map strategy Returns ------- bool for whether new point is within axis limits """ axis_vars, _, axis_lims = axis_data["axis_vars"], axis_data["axis_delta"], axis_data["axis_lims"] for av in axis_vars: # Allow some leeway below and above the axis limits. See note in docstring for details offset = 1e-6 if new_point_vars[av] < min(axis_lims[av]) - offset or new_point_vars[av] > max(axis_lims[av]) + offset: _log.info(f"New point outside axis limits. {av} = {new_point_vars[av]}. Limits = {axis_lims[av]}") return False return True def _check_axis_values_by_distance(prev_point_vars: dict[v.StateVariable, float], new_point_vars: dict[v.StateVariable, float], axis_data: Mapping, **kwargs): """ Checks that the normalized distance between the previous point and the new point is within reasonable values NOTE: a threshold of 3 is quite large since the axis swapping should limit this to 1 (give/take some leeway if the swapping hadn't occured yet) NOTE: this should scale with the global_check_interval defined in the map strategy Parameters ---------- prev_point_vars : dict[v.StateVariable, float] Conditions of last point in zpf line new_point_vars : dict[v.StateVariable, float] Conditions of new point axis_data : dict Axis variable information from map strategy Returns ------- bool for whether new point is within distance threshold of previous point """ axis_vars, _, _ = axis_data["axis_vars"], axis_data["axis_delta"], axis_data["axis_lims"] normalize_factor = kwargs.get("normalize_factor", {av: 1 for av in axis_vars}) dist_threshold = kwargs.get("distance_threshold", 3) # Squeezing here since v.MoleFraction.compute_property will return an array instead of a scalar distances = [np.squeeze(np.abs((new_point_vars[av] - prev_point_vars[av])/normalize_factor[av])) for av in axis_vars] dist = np.amax(distances) if dist > dist_threshold: av = axis_vars[np.argmax(distances)] _log.info(f"Axis variable moved more than threshold distance. {av} = {prev_point_vars[av]} -> {new_point_vars[av]}. Distance = {dist*normalize_factor[av]} > {dist_threshold}*{normalize_factor[av]}") return dist < dist_threshold
[docs] def check_axis_values(zpf_line: ZPFLine, step_results: tuple[Point, list[CompositionSet]], axis_data: Mapping, **kwargs): """ Checks both if axis values are within axis limits and if the axis values hadn"t moved to far from previous point The two checks are separated from here so that we can use them in _check_global_min and _check_change_in_phases 3 possible outcomes a) Axes are within limits and minimal distance change -> pass b) Axes are outside limits -> end zpf line with graceful ending c) Distance changed too much -> end zpf line with unexpected ending Parameters ---------- zpf_line : ZPFLine ZPFLine that the point is stepping in step_results : [Point, [CompositionSet]] Results from zpf_equilibrium.update_equilibrium_with_new_conditions axis_data : dict Axis variable data from a map strategy class Returns ------- None - this check does not produce any new nodes This will end zpf_line with FAILED or REACHED_LIMIT depending on test that failed """ if step_results is None: return None axis_vars, axis_delta, axis_lims = axis_data["axis_vars"], axis_data["axis_delta"], axis_data["axis_lims"] new_point, orig_cs = step_results prev_point = zpf_line.points[-1] new_point_vars = {av: new_point.get_property(av) for av in axis_vars} prev_point_vars = {av: prev_point.get_property(av) for av in axis_vars} if not _check_axis_values_by_distance(prev_point_vars, new_point_vars, axis_data, **kwargs): _log.info("Variable more than distance threshold. Failing ZPF line") zpf_line.status = ZPFState.FAILED return None if not _check_axis_values_within_limit(new_point_vars, axis_data): _log.info("Variable reach axis limit. Ending ZPF line") zpf_line.status = ZPFState.REACHED_LIMIT return None
[docs] def check_change_in_phases(zpf_line: ZPFLine, step_results: tuple[Point, list[CompositionSet]], axis_data: Mapping, **kwargs): """ If the number of phases changed upon stepping, then attempt to create a new node If making the new node was unsuccessful, then we end the zpf line anyways 3 possible outcomes a) No change in phases -> pass b) Change in phases, node successfully found -> process new node and end zpf line with graceful ending c) Change in phases, node not found -> end zpf line with unexpected ending Parameters ---------- zpf_line : ZPFLine ZPFLine that the point is stepping in step_results : [Point, [CompositionSet]] Results from zpf_equilibrium.update_equilibrium_with_new_conditions axis_data : dict Axis variable data from a map strategy class Returns ------- new_node : Node Node from step_results if there is a change in phases or None : if step_results does not result in a change in phase or new node cannot be found (zpf_line will be ended) """ if step_results is None: return None axis_vars, axis_delta, axis_lims = axis_data["axis_vars"], axis_data["axis_delta"], axis_data["axis_lims"] do_not_create_node = kwargs.get("do_not_create_node", False) #For testing/debugging purposes new_point, orig_cs = step_results new_point_vars = {av: new_point.get_property(av) for av in axis_vars} num_different_phases = len(set(orig_cs).symmetric_difference(set(new_point.stable_composition_sets))) if num_different_phases != 0: _log.info("Number of phases changed") # By default, assumed the zpf failed. When we successfully find a new node, then we change this zpf_line.status = ZPFState.FAILED # Make sure the set of phases only change by one # Since we step along a single axis variable, we can only fix one additional phase to satisfy Gibbs phase rule if num_different_phases > 1: _log.info("Number of phases changes by more than one. Failing ZPF line") return None if do_not_create_node: new_node = None else: new_node = zeq.create_node_from_different_points(new_point, orig_cs, axis_vars) # If the new node was successfully created, then process the node, otherwise, we unexpectedly ended if new_node is not None: new_node_vars = {av: new_node.get_property(av) for av in axis_vars} # Check that new node satisfy axis limits and distance between nodes # If not, then the zpf line ends unexpectedly check_axis = _check_axis_values_within_limit(new_node_vars, axis_data) check_dist =_check_axis_values_by_distance(new_point_vars, new_node_vars, axis_data, **kwargs) if check_axis and check_dist: _log.info(f"New node found successfully. {new_node.global_conditions}, {new_node.fixed_phases}, {new_node.free_phases}") zpf_line.status = ZPFState.NEW_NODE_FOUND return new_node _log.info("New node could not be found. Failing ZPF line") return None
[docs] def check_global_min(zpf_line: ZPFLine, step_results: tuple[Point, list[CompositionSet]], axis_data: Mapping, **kwargs): """ Check if the point is global minimum 1. Check if a new composition set can be added that can lower free energy 2. Create a new node with the additional composition set 3. Check that the new node is valid 3 possible outcomes a) No change in phases -> pass b) Change in phases, node successfully found -> process new node and end zpf line with graceful ending c) Change in phases, node not found -> end zpf line with unexpected ending Parameters ---------- zpf_line : ZPFLine ZPFLine that the point is stepping in step_results : [Point, [CompositionSet]] Results from zpf_equilibrium.update_equilibrium_with_new_conditions axis_data : dict Axis variable data from a map strategy class Returns ------- new_node : Node Node from step_results if global min is found or None : if step_results is still global min or new node cannot be found (zpf_line will be ended) """ if step_results is None: return None axis_vars, axis_delta, axis_lims = axis_data["axis_vars"], axis_data["axis_delta"], axis_data["axis_lims"] global_check_interval = kwargs.get("global_check_interval", 1) pdens = kwargs.get("pdens", 500) tol = kwargs.get("tol", 1e-4) num_candidates = kwargs.get("global_num_candidates", 1) system_info = kwargs.get("system_info", None) do_not_create_node = kwargs.get("do_not_create_node", False) #For testing/debugging purposes new_point, orig_cs = step_results new_point_vars = {av: new_point.get_property(av) for av in axis_vars} if len(zpf_line.points) % global_check_interval == 0: global_test_point = zeq.find_global_min_point(new_point, system_info, pdens, tol, num_candidates) if global_test_point is not None: _log.info(f"Global min detected. {new_point.stable_phases} -> {global_test_point.stable_phases}") # By default, assumed the zpf failed. When we successfully find a new node, then we change this zpf_line.status = ZPFState.FAILED if do_not_create_node: new_node = None else: new_node = zeq.create_node_from_different_points(global_test_point, new_point.stable_composition_sets, axis_vars) if new_node is not None: new_node_vars = {av: new_node.get_property(av) for av in axis_vars} # Check that new node satisfy axis limits and distance between nodes # If not, then the zpf line ends unexpectedly check_axis = _check_axis_values_within_limit(new_node_vars, axis_data) check_dist =_check_axis_values_by_distance(new_point_vars, new_node_vars, axis_data, **kwargs) # check_dist = True if check_axis and check_dist: zpf_line.status = ZPFState.NEW_NODE_FOUND _log.info(f"New node found successfully. {new_node.global_conditions}, {new_node.fixed_phases}, {new_node.free_phases}") return new_node _log.info("New node could not be found. Failing ZPF line")
[docs] def check_similar_phase_composition(zpf_line: ZPFLine, step_results: tuple[Point, list[CompositionSet]], axis_data: Mapping, **kwargs): """ If two composition sets are close in composition, then we stop zpf line pre-maturely so that they don"t go on top of each other If the composition are the same, it can result in ill-defined matrices in the solver 2 possible outcomes a) Composition sets are separate -> pass b) Composition sets are similar -> end zpf line with unexpected ending Parameters ---------- zpf_line : ZPFLine ZPFLine that the point is stepping in step_results : [Point, [CompositionSet]] Results from zpf_equilibrium.update_equilibrium_with_new_conditions axis_data : dict Axis variable data from a map strategy class Returns ------- None : this check does not attempt to make a new node However, the zpf line will end if this check fails """ if step_results is None: return None new_point, orig_cs = step_results comp_sets = new_point.stable_composition_sets for i in range(len(comp_sets)): for j in range(i+1, len(comp_sets)): same_comp = np.allclose(comp_sets[i].X, comp_sets[j].X, atol=10*COMP_DIFFERENCE_TOL) same_phase = comp_sets[i].phase_record.phase_name == comp_sets[j].phase_record.phase_name if same_comp and same_phase: zpf_line.status = ZPFState.REACHED_LIMIT _log.info(f"Two composition sets have the same composition. Ending ZPF line. {comp_sets[i]} = {comp_sets[j]}") return None return None