Source code for pycalphad.mapping.strategy.strategy_base

from typing import Union
import logging
import copy

import numpy as np

from pycalphad import Database, variables as v
from pycalphad.codegen.phase_record_factory import PhaseRecordFactory
from pycalphad.core.utils import instantiate_models, filter_phases, unpack_species, get_pure_elements, get_state_variables
from pycalphad.core.composition_set import CompositionSet


from pycalphad.mapping.primitives import ZPFLine, NodeQueue, Node, Point, ExitHint, Direction, MIN_COMPOSITION, ZPFState
from pycalphad.mapping.starting_points import point_from_equilibrium
import pycalphad.mapping.zpf_equilibrium as zeq
import pycalphad.mapping.zpf_checks as zchk
import pycalphad.mapping.utils as map_utils

_log = logging.getLogger(__name__)

[docs] class MapStrategy: """ Base strategy class for phase diagram construction. Derived Classes --------------- - SteppingStrategy: For single-axis diagrams. - BinaryStrategy: For binary phase diagrams (1 composition, 1 potential axis). - TernaryStrategy: For ternary phase diagrams (2 composition axes). - IsoplethStrategy: For isopleths (tested only for 1 composition, 1 potential axis so far). Constants --------- DELTA_SCALE : float Factor to scale down step size if a single step iteration was unsuccessful (default: 0.5). MIN_DELTA_RATIO : float Minimum step size (as ratio of default) before stopping ZPF line iteration (default: 0.1). GLOBAL_CHECK_INTERVAL : int Number of iterations before global minimum check. Proceed with caution for any interval >1 (default: 1). GLOBAL_MIN_PDENS : int Sampling density for global minimum check (default: 500). GLOBAL_MIN_TOL : float Minimum driving force for a composition set to pass the global minimum check (default: 1e-4). GLOBAL_MIN_NUM_CANDIDATES : int Number of candidates to search through for finding the global minimum. Sometimes, the global minimum can be missed if the sampling is poor, so checking the n-best candidates can help (default: 1). """ def __init__(self, dbf: Database, components: list[str], phases: list[str], conditions: dict[v.StateVariable, Union[float, tuple[float]]], **kwargs): if isinstance(dbf, str): dbf = Database(dbf) self.dbf = dbf # Don't add vacancies to components in case user needs to restrict non-stoichiometric phases self.components = sorted(components) self.elements = get_pure_elements(self.dbf, self.components) self.phases = filter_phases(self.dbf, unpack_species(self.dbf, self.components), phases) self.conditions = copy.deepcopy(conditions) # Add v.N to conditions. Mapping assumes that v.N is in conditions if v.N not in self.conditions: self.conditions[v.N] = 1 self.axis_vars = [key for key, val in self.conditions.items() if len(np.atleast_1d(val)) > 1] composition_sum = sum([conditions[var] for var in conditions if (isinstance(var, v.MoleFraction) and var not in self.axis_vars)]) axis_minimums = {av : np.amin([self.conditions[av][0], self.conditions[av][1]]) if isinstance(av, v.MoleFraction) else 0 for av in self.axis_vars} # For composition variable, adjust max limit to limit the sum of composition to 1 # For mapping in two composition variable, assume that the other variable is at the minimum limit # This is a little intrusive, since we're silently changing a user input, but this should be okay since # it'll give the same result without attempting to compute equilibrium at invalid compositions self.axis_lims = {} self.axis_delta = {} for var in self.axis_vars: self.axis_delta[var] = self.conditions[var][-1] if isinstance(var, v.MoleFraction): # If there are other composition variables, offset the upper limit further by the minimum of the other variables min_limit_sum = sum([axis_minimums[av] for av in axis_minimums if av != var]) upper_limit = np.amin([self.conditions[var][1], 1 - composition_sum - min_limit_sum]) self.axis_lims[var] = (self.conditions[var][0], upper_limit) else: self.axis_lims[var] = (self.conditions[var][0], self.conditions[var][1]) self.models = instantiate_models(self.dbf, self.components, self.phases) state_vars = get_state_variables(self.models, self.conditions) self.num_potential_condition = len([av for av in self.axis_lims if av in state_vars]) self.phase_records = PhaseRecordFactory(self.dbf, self.components, state_vars, self.models) # In case we need to call pycalphad functions outside this class self.system_info = { "dbf": self.dbf, "comps": self.components, "phases": self.phases, "models": self.models, "phase_records": self.phase_records } self.zpf_lines: list[ZPFLine] = [] self.node_queue = NodeQueue() self._current_node = None self._exits = [] self._exit_dirs = [] self._exit_index = 0 # Some default constants self.DELTA_SCALE = kwargs.get("DELTA_SCALE", 0.5) self.MIN_DELTA_RATIO = kwargs.get("MIN_DELTA_RATIO", 0.1) self.GLOBAL_CHECK_INTERVAL = kwargs.get("GLOBAL_CHECK_INTERVAL", 1) self.GLOBAL_MIN_PDENS = kwargs.get("GLOBAL_MIN_PDENS", 500) self.GLOBAL_MIN_TOL = kwargs.get("GLOBAL_MIN_TOL", 1e-4) self.GLOBAL_MIN_NUM_CANDIDATES = kwargs.get("GLOBAL_MIN_NUM_CANDIDATES", 1) def _constant_kwargs(self): """ Creates list of global constants to pass to zpf checks """ const_kwargs = { "DELTA_SCALE": self.DELTA_SCALE, "MIN_DELTA_RATIO": self.MIN_DELTA_RATIO, "GLOBAL_CHECK_INTERVAL": self.GLOBAL_CHECK_INTERVAL, "GLOBAL_MIN_PDENS": self.GLOBAL_MIN_PDENS, "GLOBAL_MIN_TOL": self.GLOBAL_MIN_TOL, "GLOBAL_MIN_NUM_CANDIDATES": self.GLOBAL_MIN_NUM_CANDIDATES, } return const_kwargs
[docs] def get_all_phases(self): """ Goes through ZPF lines to get all unique phases. For miscibility gaps, phases will have #n added to it In some cases, there might be no ZPF lines (e.g. ternaries with all line compounds), in which case, we return an empty set There should always be nodes in the node_queue since it includes starting points (even if they're not nodes in the mapping sense) """ if len(self.zpf_lines) > 0: zpf_phases = set.union(*[set(zpf_line.stable_phases_with_multiplicity) for zpf_line in self.zpf_lines]) else: zpf_phases = set() if len(self.node_queue.nodes) > 0: node_phases = set.union(*[set(node.stable_phases_with_multiplicity) for node in self.node_queue.nodes]) else: node_phases = set() return list(set.union(zpf_phases, node_phases))
[docs] def normalize_factor(self, av): """ Since potential and composition variables are on different scales, we normalize by the delta value """ return self.axis_delta[av]
def _other_av(self, av): """ Returns other axis variable if there are two free variables """ if len(self.axis_vars) == 1: return None else: return self.axis_vars[1-self.axis_vars.index(av)]
[docs] def add_nodes_from_conditions(self, conditions: dict[v.StateVariable, float], direction: Direction = None, force_add: bool = True): """ Computes equilibrium and creates a point from input conditions If a direction is supplied, then we can just add the new point with the direction If not, then we add two points, for each direction Since this is a starting point, we set the ExitHint to POINT_IS_EXIT since we assume the starting point is not a true node Also by default, we force add the node to skip checking if the node is already in the node queue """ point = point_from_equilibrium(self.dbf, self.components, self.phases, conditions, models=self.models, phase_record_factory=self.phase_records) if point is None: _log.warning(f"Point could not be found from {conditions}") return False if direction is None: _log.info(f"No direction is given, adding point from {conditions} with both directions") self.node_queue.add_node(self._create_node_from_point(point, None, None, Direction.POSITIVE, ExitHint.POINT_IS_EXIT), force_add) self.node_queue.add_node(self._create_node_from_point(point, None, None, Direction.NEGATIVE, ExitHint.POINT_IS_EXIT), force_add) else: self.node_queue.add_node(self._create_node_from_point(point, None, None, direction, ExitHint.POINT_IS_EXIT), force_add) return True
def _create_node_from_point(self, point: Point, parent: Point, start_ax: v.StateVariable, start_dir: Direction, exit_hint: ExitHint = ExitHint.NORMAL): """ Given a point and a parent, create a node with a starting axis/direction Note: parent can be None, which allows for distinguishing if a node came from a starting point or from mapping """ new_node = Node(point.global_conditions, point.chemical_potentials, point.fixed_composition_sets, point.free_composition_sets, parent) new_node.axis_var = start_ax new_node.axis_direction = start_dir new_node.exit_hint = exit_hint return new_node
[docs] def iterate(self): """ Hierarchy of mapping iterations 1. Node queue If not empty, then create exits from next node 2. Node exits If exits remain, then create new zpf line from next exit 3. ZPF line If not finished, continue ZPF line """ if len(self.zpf_lines) > 0 and self.zpf_lines[-1].status == ZPFState.NOT_FINISHED: self._continue_zpf_line() elif len(self._exits) > self._exit_index: _log.info("No zpf line or last zpf line has finished. Attempting to start zpf line from next exit") # This will start a new zpf line that is not finished self._start_zpf_line() self._exit_index += 1 elif self.node_queue.size() > 0: _log.info("No more exits from current node. Attempting to get next node") # This will reset the _exits list and _exitIndex to 0 self._find_node_exits() else: # Mapping is finished once the last zpf line is finished, # the last node has no more exits and the node queue has no more nodes _log.info("No more nodes in queue. Ending mapping.") return True return False
[docs] def do_map(self): """ Wrapper over iterate to run until finished """ finished = False while not finished: finished = self.iterate()
def _continue_zpf_line(self): """ 1. If more than two free axis, check if axis needs to be swapped Ideally, we want the axis that will result in minimal change in the other axes 2. Take step along current axis and direction 3. Check if the new point generated from step can be added to the zpf line _attempt_to_add_point will account for invalid points and detecting new nodes """ # Test if axis needs to be swapped self._test_swap_axis(self.zpf_lines[-1]) # Take step step_results = self._step_zpf_line(self.zpf_lines[-1]) # Check new point and try to add it to zpf line self._attempt_to_add_point(self.zpf_lines[-1], step_results) def _test_swap_axis(self, zpf_line: ZPFLine): """ If there is only one axis variable, then we don't need to test for swapping the axis For more than 2 axis, we do a comparison of how much each axis variable changed in the last two steps, then change to step in the axis variable that changed the most """ if len(self.axis_vars) == 1: return else: if len(zpf_line.points) > 1: # Get change in axis variable for both variables curr_point = zpf_line.points[-1] prev_point = zpf_line.points[-2] dv = [(curr_point.get_property(av) - prev_point.get_property(av))/self.normalize_factor(av) for av in self.axis_vars] # We want to step in the axis variable that changes the most (that way the change in the other variable will be minimal) # We also can get the direction from the change in variable index = np.argmax(np.abs(dv)) direction = Direction.POSITIVE if dv[index] > 0 else Direction.NEGATIVE if zpf_line.axis_var != self.axis_vars[index]: _log.info(f"Swapping axis to {self.axis_vars[index]}. ZPF vector {dv} {self.axis_vars}") # Since we check the change in axis variable at the current delta, we'll retain the same delta # when switching axis variable (same delta as a ratio of the initial delta) delta_scale = zpf_line.current_delta / self.axis_delta[zpf_line.axis_var] zpf_line.axis_var = self.axis_vars[index] zpf_line.axis_direction = direction zpf_line.current_delta = self.axis_delta[zpf_line.axis_var] * delta_scale def _step_conditions(self, point: Point, axis_var: v.StateVariable, axis_delta: float, axis_lims: tuple[float], direction: Direction): """ Creates a copy of condition and steps in proposed direction If the axis variable is outside the defined limits, then we adjust the conditions to be at the edge of the axis limits """ new_conds = copy.deepcopy(point.global_conditions) new_conds[axis_var] += axis_delta*direction.value hit_axis_limit = False # Offset (for composition, this pushes the axis variable to be slightly off the limits to avoid pure components) offset = 0 if map_utils.is_state_variable(axis_var) else MIN_COMPOSITION if new_conds[axis_var] > max(axis_lims) - offset: new_conds[axis_var] = max(axis_lims) - offset hit_axis_limit = True if new_conds[axis_var] < min(axis_lims) + offset: new_conds[axis_var] = min(axis_lims) + offset hit_axis_limit = True return new_conds, hit_axis_limit def _take_step(self, point: Point, axis_var: v.StateVariable, axis_delta: float, axis_lims: tuple[float], direction: Direction): """ Adjust conditions and compute equilibrium, this will return a new point (or None if equilibrium fails) """ # Here we don't care if stepping reached the axis limit since we check this afterwards new_conds, hit_axis_limit = self._step_conditions(point, axis_var, axis_delta, axis_lims, direction) if hit_axis_limit: return None _log.info(f"Stepping point {point.fixed_phases}, {point.free_phases}, {point.global_conditions} along {axis_var}, {axis_delta*direction.value}") return zeq.update_equilibrium_with_new_conditions(point, new_conds, self._other_av(axis_var)) def _step_zpf_line(self, zpf_line: ZPFLine): """ This is the same as _take_step except we use the conditions from the current zpf line """ return self._take_step(zpf_line.points[-1], zpf_line.axis_var, zpf_line.current_delta, self.axis_lims[zpf_line.axis_var], zpf_line.axis_direction) def _attempt_to_add_point(self, zpf_line: ZPFLine, step_results: tuple[Point, list[CompositionSet]]): """ Go through list of check functions to see if the new point can be added Rules 1. All check functions will take in the current zpf line and a tuple(new_point, [orig_cs]) 2. If the check function returns True, it means the check has passed and we move onto the next check 3. If the check function returns False, we stop checking and end this function a. Fails because equilibrium did not converge or invalid condition -> adjust axis delta and try again b. Fails because we found a node -> backtrack zpf line to remove offending points and add node parent to zpf line 4. If all check functions pass, then we add the point Not a fan of how this is implemented, but I want the API for each check function to be the same, with extra args having default values if not supplied """ check_functions = [zchk.check_valid_point, zchk.check_change_in_phases, zchk.check_global_min, zchk.check_axis_values, zchk.check_similar_phase_composition] axis_data = { "axis_vars": self.axis_vars, "axis_delta": self.axis_delta, "axis_lims": self.axis_lims } extra_args = { "delta_scale": self.DELTA_SCALE, "min_delta_ratio": self.MIN_DELTA_RATIO, "global_check_interval": self.GLOBAL_CHECK_INTERVAL, "global_num_candidates": self.GLOBAL_MIN_NUM_CANDIDATES, "normalize_factor": {av: self.normalize_factor(av) for av in self.axis_vars}, "system_info": self.system_info, "pdens": self.GLOBAL_MIN_PDENS, "tol": self.GLOBAL_MIN_TOL } for check in check_functions: new_node = check(zpf_line, step_results, axis_data, **extra_args) # If we found a new node, then process it and end zpf line if zpf_line.status == ZPFState.NEW_NODE_FOUND: self._process_new_node(zpf_line, new_node) return # If equilibrium failed, then reset zpf line so it can step from previous point elif zpf_line.status == ZPFState.ATTEMPT_NEW_STEP: zpf_line.status = ZPFState.NOT_FINISHED return # If anything occurs that interrupts zpf line, break out of the loop # Whether we add or don't add point will be decided later elif zpf_line.status != ZPFState.NOT_FINISHED: _log.info(f"ZPF line {zpf_line.fixed_phases}, {zpf_line.free_phases} ended with {zpf_line.status}") break # If the zpf line reached a limit (axis limits or similar phase composition), then we # can still add the point valid_statuses = [ZPFState.NOT_FINISHED, ZPFState.REACHED_LIMIT] if zpf_line.status in valid_statuses: new_point, _ = step_results zpf_line.append(new_point) # If we successfully added a new point, attempt to increase the axis delta if is smaller than the default if zpf_line.current_delta < self.axis_delta[zpf_line.axis_var]: zpf_line.current_delta = np.amin([self.axis_delta[zpf_line.axis_var], zpf_line.current_delta / self.DELTA_SCALE]) def _process_new_node(self, zpf_line: ZPFLine, new_node: Node): """ Back tracks zpf line until the angle between the last edge on the zpf line and the edge connect to the node is greater than 90 (dot product > 0) TODO: while this should allow for performing global min check every couple iterations, this is very iffy. It's fine for now since the global check interval defaults to 1 """ _log.info("Back tracking zpf line to add node") node_pos = np.array([new_node.get_property(av) for av in self.axis_vars]) orig_len = len(zpf_line.points) for i in range(len(zpf_line.points)-1, 0, -1): p1 = zpf_line.points[i] p2 = zpf_line.points[i-1] p1_pos = np.array([p1.get_property(av) for av in self.axis_vars]) p2_pos = np.array([p2.get_property(av) for av in self.axis_vars]) v21 = p1_pos - p2_pos vnode1 = node_pos - p1_pos if np.dot(v21, vnode1) < 0: del zpf_line.points[i] else: break final_len = len(zpf_line.points) _log.info(f"Removed zpf points from {orig_len} to {final_len}") zpf_line.append(new_node.parent) # Set axis variable and direction from the previous zpf line # So we know where the node came from (this can give some hints for exit finding) new_node.axis_var = zpf_line.axis_var new_node.axis_direction = zpf_line.axis_direction # Add to node queue # For stepping, we will force add. Most nodes will be unique, however, if we're stepping along composition # in a binary, then the nodes won't be unique if len(self.axis_vars) == 1: self.node_queue.add_node(new_node, True) else: if not self.node_queue.add_node(new_node): _log.info(f"Node {new_node.fixed_phases}, {new_node.free_phases} has already been added") def _start_zpf_line(self): """ 1. Gets current exit from the node 2. Find start direction 3. Create new zpf line with exit and direction information """ # Find start direction from current exit direction_data = self._determine_start_direction(self._current_node, self._exits[self._exit_index], self._exit_dirs[self._exit_index]) # If no direction can be found, then move to the next exit if direction_data is None: return # Create node from exit and start direction exit_point, start_ax, start_dir, start_delta = direction_data # Initialize zpf line self.zpf_lines.append(ZPFLine(exit_point.fixed_phases, exit_point.free_phases)) self.zpf_lines[-1].points.append(exit_point) self.zpf_lines[-1].axis_var = start_ax self.zpf_lines[-1].axis_direction = start_dir self.zpf_lines[-1].current_delta = start_delta _log.info(f"Starting zpf line with {self.zpf_lines[-1].points[-1].global_conditions}, {self.zpf_lines[-1].points[-1].fixed_phases}, {self.zpf_lines[-1].points[-1].free_phases} and {self.zpf_lines[-1].axis_var}, {self.zpf_lines[-1].current_delta*self.zpf_lines[-1].axis_direction.value}") _log.info(f"ZPF starting from node {self._current_node.fixed_phases}, {self._current_node.free_phases}, {self._current_node.global_conditions}") def _determine_start_direction(self, node: Node, exit_point: Point, proposed_direction: Direction = None): """ From an exit point, this will return an axis variable, direction and delta that is suggested to give a good starting direction to a ZPF line If proposed_direction is None, this will test both positive and negative, otherwise, it will only test the proposed direction """ raise NotImplementedError() def _find_node_exits(self): """ 1. Get next node in queue 2. Find exits from node (implemented for each strategy) """ # Get new node from node queue self._current_node = self.node_queue.get_next_node() _log.info(f"Finding exits from node {self._current_node.fixed_phases}, {self._current_node.free_phases}, {self._current_node.global_conditions}") # Find all exits from node self._exits, self._exit_dirs = self._find_exits_from_node(self._current_node) exit_info = [f"Point: {ex.fixed_phases}, {ex.free_phases}, {ex_dir}" for ex, ex_dir in zip(self._exits, self._exit_dirs)] _log.info(f"Found exits {exit_info}") self._exit_index = 0 def _find_exits_from_node(self, node: Node): """ Default exit function which uses the node as an exit if the ExitHint says so Derived classes will check if the return of this is empty arrays and if they are, then they"ll fill them up their own way """ exits = [] exit_dirs = [] if node.exit_hint == ExitHint.POINT_IS_EXIT: _log.info(f"Using node {node.fixed_phases}, {node.free_phases}, {node.global_conditions} as the exit point with directions {node.axis_direction}") if node.axis_direction is None: exits.append(node) exit_dirs.append(None) else: exits.append(node) exit_dirs.append(node.axis_direction) return exits, exit_dirs def _test_direction(self, point: Point, axis_var: v.StateVariable, direction: Direction): """ Given an axis variable and direction, test that we can step from the point If stepping doesn"t work, we continue to reduce the step size until it can (or until the step size is too small) We check whether stepping is possible by checking if equilibrium converged, if it"s still global min and if the number of phases stayed the same We"ll skip the other two checks (axis lims/distance and similar compositions) since technically, these are valid points that can be added """ # For stepping, there is no other variable # For tielines and isopleths, we keep track of the other av # This is mainly to check which axis variable changes the least when stepping # Where we want to be stepping in the variable that changes the other variable the least if len(self.axis_vars) == 1: other_av, other_av_val = None, None else: other_av = self.axis_vars[1-self.axis_vars.index(axis_var)] other_av_val = point.get_property(other_av) # Set the starting delta to be the minimum (delta * min_delta_ratio). Then if direction is # successful, we can scale the delta after successful zpf line iterations curr_delta = self.axis_delta[axis_var] * self.MIN_DELTA_RATIO new_conds, hit_axis_limit = self._step_conditions(point, axis_var, curr_delta, self.axis_lims[axis_var], direction) if not hit_axis_limit: step_results = zeq.update_equilibrium_with_new_conditions(point, new_conds, self._other_av(axis_var)) extra_args = { "system_info": self.system_info, "pdens": self.GLOBAL_MIN_PDENS, "tol": self.GLOBAL_MIN_TOL } # Check valid equilibrium, global min and change in phases check_functions = [zchk.simple_check_valid_point, zchk.simple_check_change_in_phases, zchk.simple_check_global_min] valid_point = True for checks in check_functions: if not checks(step_results, **extra_args): valid_point = False break # If valid point, then record the change in the other axis and return both deltas if valid_point: other_av_delta = None new_point, orig_cs = step_results if other_av is not None: new_other_av_val = new_point.get_property(other_av) other_av_delta = abs(other_av_val - new_other_av_val) return curr_delta, other_av_delta # If stepping failed, then returned None _log.info(f"Stepping point {point.fixed_phases}, {point.free_phases}, {point.global_conditions}, {direction} failed with step size {curr_delta}.") return None