from typing import Union
import logging
import itertools
import copy
import numpy as np
from pycalphad import Database, variables as v
from pycalphad.core.composition_set import CompositionSet
from pycalphad.property_framework.computed_property import LinearCombination
from pycalphad.mapping.primitives import ZPFLine, Node, Point, ExitHint, Direction, MIN_COMPOSITION, ZPFState, _eq_compset
from pycalphad.mapping.starting_points import point_from_equilibrium
import pycalphad.mapping.zpf_equilibrium as zeq
import pycalphad.mapping.utils as map_utils
from pycalphad.mapping.strategy.strategy_base import MapStrategy
from pycalphad.mapping.strategy.step_strategy import StepStrategy
from pycalphad.mapping.strategy.strategy_data import PhaseRegionData, get_invariant_data_from_tieline_strategy, get_tieline_data_from_tieline_strategy
_log = logging.getLogger(__name__)
def _get_delta_cs_var(point: Point, comp_sets: list[CompositionSet], axis_vars: list[v.StateVariable], normalize=True):
"""
Given two composition sets, get the vector using axis_vars as coordinates
Seems a bit strange to have the comp sets as an input when we already have the point, but it's just to have some
flexibility on which comp sets to treat at 0 or 1
"""
dx = point.get_local_property(comp_sets[0], axis_vars[0]) - point.get_local_property(comp_sets[1], axis_vars[0])
dy = point.get_local_property(comp_sets[0], axis_vars[1]) - point.get_local_property(comp_sets[1], axis_vars[1])
norm_factor = 1 if not normalize else np.sqrt(dx*dx + dy*dy)
return [dx/norm_factor, dy/norm_factor]
def _get_norm(point: Point, axis_vars: list[v.StateVariable]):
"""
Creates normal vector of a tieline
Assumes point has two phases
"""
vec = _get_delta_cs_var(point, point.stable_composition_sets, axis_vars)
return [-vec[1], vec[0]]
def _create_linear_comb_conditions(point: Point, axis_vars: list[v.StateVariable], normal: list[float] = None):
"""
Creates a linear combination along a normal vector
If normal is not given, then the normal will be the normal of the tieline defined by the point
"""
# Get normal and axis variable to step in (this will be along the maximum of the normal vector)
if normal is None:
normal = _get_norm(point, axis_vars)
# Create a linear combination that forces the stepping to be along the normal
c = normal[1]*point.global_conditions[axis_vars[0]] - normal[0]*point.get_property(axis_vars[1])
lc = LinearCombination(normal[1]*axis_vars[0] - normal[0]*axis_vars[1])
return lc, c
def _sort_point(point: Point, axis_vars: list[v.StateVariable]):
"""
Given a point with 2 free phases, get derivative at both composition sets to
test with CS to fix and which direction to start
NOTE: unlike the binary version of this function, normal is defined from the tieline
rather than the axis variables
"""
_log.info(f"Sorting point {point.fixed_phases}, {point.free_phases}, {point.global_conditions}")
# Free all phases in point, we will fix the phase at the end
free_point = map_utils._generate_point_with_free_cs(point)
# Get normal and axis variable to step in (this will be along the maximum of the normal vector)
normal = _get_norm(free_point, axis_vars)
av = axis_vars[np.argmax(np.abs(normal))]
_log.info(f"Point {point.fixed_phases}, {point.free_phases} with normal {normal}. Testing derivative in {av}")
# This is here to have derivative with respect to the axis variable to follow the direction of the normal
# NOTE: the derivative we compute is not the directional derivative with respect to the normal
# but rather the derivative with respect to the axis variable, with the normal as a fixed condition
# Since we lose the direction information of the normal when taking the derivative, mult_factor is here
# to retain that information
mult_factor = np.sign(normal[axis_vars.index(av)])
# Create a linear combination that forces the stepping to be along the normal
lc, c = _create_linear_comb_conditions(point, axis_vars, normal)
# Create temporary set of conditions that replaces the fixed axis variable with the linear combination
del free_point.global_conditions[axis_vars[1-axis_vars.index(av)]]
free_point.global_conditions[lc] = c
# Test phase composition derivative for each phase stepping along the axis variable fixed along the normal
stable_cs = free_point.stable_composition_sets
cs_options = [[stable_cs[0], stable_cs[1]], [stable_cs[1], stable_cs[0]]]
options_tests = []
for cs_list in cs_options:
for av_test in axis_vars:
v_num = v.X(cs_list[1].phase_record.phase_name, av_test.species)
der = mult_factor*zeq.compute_derivative(free_point, v_num, av, free_den = False)
new_point = map_utils._generate_point_with_fixed_cs(point, cs_list[0], cs_list[1])
options_tests.append((der, new_point, av_test))
# Best index is the one with the largest derivative
best_index = -1
best_der = -1
for i in range(len(options_tests)):
_log.info(f"Option: Axis var {options_tests[i][2]}, derivative {options_tests[i][0]}, point {options_tests[i][1].fixed_phases}, {options_tests[i][1].free_phases}")
if abs(options_tests[i][0]) > best_der:
best_index = i
best_der = abs(options_tests[i][0])
if best_index == -1:
return options_tests[0][0], options_tests[0][1], options_tests[0][2], normal
else:
return options_tests[best_index][0], options_tests[best_index][1], options_tests[best_index][2], normal
[docs]
class TernaryStrategy(MapStrategy):
def __init__(self, dbf: Database, components: list[str], phases: list[str], conditions: dict[v.StateVariable, Union[float, tuple[float]]], **kwargs):
super().__init__(dbf, components, phases, conditions, **kwargs)
# TODO: This assumes pure elements and will likely change with the generalize component support
unlisted_element = list(set(self.components) - {'VA'} - set([str(av.species) for av in self.axis_vars]))[0]
self.all_vars = self.axis_vars + [v.X(unlisted_element)]
[docs]
def generate_automatic_starting_points(self):
"""
Searches axis limits to find starting points
Here, we do a step mapping along the axis bounds and grab all the nodes
The nodes of a step map is distinguished from starting points in that they have a parent
"""
map_kwargs = self._constant_kwargs()
# Iterate through axis variables, and set conditions to fix axis variable at min only
for av in self.axis_vars:
conds = copy.deepcopy(self.conditions)
conds[av] = np.amin(self.axis_lims[av])
# Adjust composition conditions to be slightly above 0 for numerical stability
if isinstance(av, v.X):
if conds[av] == 0:
conds[av] = MIN_COMPOSITION
# Step map
step = StepStrategy(self.dbf, self.components, self.phases, conds, **map_kwargs)
step.do_map()
self.add_starting_points_from_step(step)
# Additional step where we switch axis conditions to the unlisted variable
conds = copy.deepcopy(self.conditions)
conds[self.all_vars[-1]] = MIN_COMPOSITION
del conds[self.axis_vars[0]]
# Step map
step = StepStrategy(self.dbf, self.components, self.phases, conds, **map_kwargs)
step.do_map()
self.add_starting_points_from_step(step)
[docs]
def add_starting_points_from_step(self, step: StepStrategy):
"""
Adds all 2-phase and 3-phase regions from step as starting points
We also do a global min check to make sure these phase regions are truly the phases they say they are
"""
for zpf_line in step.zpf_lines:
if len(zpf_line.stable_phases) == 2 or len(zpf_line.stable_phases) == 3:
num_phases = len(zpf_line.stable_phases)
# Iterate through the zpf line until we find a point with the same number of phases as the
# zpf line itself. This seems redundant, but it is because the first point of the zpf line
# is likely going to be a node or a node parent, where the number of phases may differ by 1
p_index = 0
while len(zpf_line.points[p_index].stable_phases) != num_phases:
p_index += 1
# If we do find a good point on the zpf line, then we can process it to be a starting point
if len(zpf_line.points[p_index].stable_phases) == num_phases:
new_point = zpf_line.points[p_index]
if self.all_vars[-1] in new_point.global_conditions:
del new_point.global_conditions[self.all_vars[-1]]
new_point.global_conditions[self.axis_vars[0]] = new_point.get_property(self.axis_vars[0])
_log.info(f"Adding node {new_point.fixed_phases}, {new_point.free_phases}, {new_point.global_conditions}")
# New point will be make with all composition sets as free, since we set
# which CS to fix when finding an exit
free_point = map_utils._generate_point_with_free_cs(new_point)
cs_result = zeq._find_global_min_cs(free_point, system_info=self.system_info, pdens=self.GLOBAL_MIN_PDENS, tol=self.GLOBAL_MIN_TOL, num_candidates=self.GLOBAL_MIN_NUM_CANDIDATES)
if cs_result is None:
new_node = self._create_node_from_point(free_point, None, None, None)
self.node_queue.add_node(new_node)
def _validate_custom_starting_point(self, point: Point, direction: Direction):
"""
Modifies exit hint and direction based off number of composition sets
Single phase -> cannot be added
Two phase -> point is exit, direction stays the same as input
Three phase -> normal exit finding strategy, no direction needed
This is the same as BinaryStrategy
"""
if len(point.stable_composition_sets) <= 1:
return None, None, "Single phase detected"
elif len(point.stable_composition_sets) == 2:
return ExitHint.POINT_IS_EXIT, direction, None
elif len(point.stable_composition_sets) == 3:
return ExitHint.NORMAL, None, None
else:
return None, None, "More than three phases detected"
def _find_exits_from_node(self, node: Node):
"""
A node in a ternary system has three exits, which are combinations of 2 CS in the node
Since the node is found from one pair of CS, one of the exits are already accounted for, so
practically, it's only 2 exits
However, if there are multiple starting points, a node may be found from multiple zpf lines
thus a node may only have 1 or even 0 exits - (does this mean some exits are repeated if a node is found twice?)
I believe this is the same as the binary strategy
"""
exits, exit_dirs = super()._find_exits_from_node(node)
if node.exit_hint == ExitHint.POINT_IS_EXIT:
return exits, exit_dirs
for cs_1, cs_2 in itertools.combinations(node.stable_composition_sets, 2):
new_conds = {key: node.get_property(key) for key, val in node.global_conditions.items()}
# Create point with free composition sets (and ignore whether cs_1 or cs_2 is actually fixed)
# This will be modified in _determine_start_direction
candidate_point = Point.with_copy(new_conds, node.chemical_potentials, [], [cs_1, cs_2])
if not node.has_point_been_encountered(candidate_point, False):
_log.info(f"Found candidate exit: {candidate_point.fixed_phases}, {candidate_point.free_phases}, {candidate_point.global_conditions}")
exits.append(candidate_point)
# This function is only responsible for finding exit points
# The _determine_start_direction will take the point and find the direction and axis variable
exit_dirs.append(None)
return exits, exit_dirs
def _determine_start_direction(self, node: Node, exit_point: Point, proposed_direction: Direction):
"""
For stepping, only one direction is possible from a node since we either step positive or negative
If a direction cannot be found, then we force add a starting point just past the exit_point
"""
# Sort exit point to fix composition set that varies the least
der, exit_point, axis_var, normal = _sort_point(exit_point, self.axis_vars)
free_cs = exit_point.free_composition_sets[0]
# If node is invariant, then we can get the other cs to know which direction to step away from
# It's possible that the node is not invariant (ex. as a starting point), in which case, we just set the norm_delta_dot to 1
if len(node.stable_composition_sets) == 3:
other_cs = [cs for cs in node.stable_composition_sets if not _eq_compset(cs, exit_point.fixed_composition_sets[0]) and not _eq_compset(cs, exit_point.free_composition_sets[0])][0]
delta_var = _get_delta_cs_var(exit_point, [free_cs, other_cs], self.axis_vars)
norm_delta_dot = np.dot(delta_var, normal)
else:
norm_delta_dot = 1
if proposed_direction is None:
# If the derivative of the axis variable is positive and the normal is the same direction as the delta
# Or if derivative is negative and normal is opposite direction, then stepping is Positive direction
# would be preferred. Same goes for the two other cases where Negative direction is preferred
# We add the other direction just in case the first proposed direction fails
if norm_delta_dot * der > 0:
directions = [Direction.POSITIVE, Direction.NEGATIVE]
else:
directions = [Direction.NEGATIVE, Direction.POSITIVE]
else:
directions = [proposed_direction]
for d in directions:
dir_results = self._test_direction(exit_point, axis_var, d)
if dir_results is not None:
av_delta, other_av_delta = dir_results
_log.info(f"Found direction: {axis_var, d, av_delta} for point {exit_point.fixed_phases}, {exit_point.free_phases}, {exit_point.global_conditions}")
return exit_point, axis_var, d, av_delta
self._add_starting_point_at_new_condition(exit_point, normal, Direction.POSITIVE if norm_delta_dot > 0 else Direction.NEGATIVE)
return None
def _add_starting_point_at_new_condition(self, point: Point, normal: list[float], direction: Direction):
"""
If we made it here, then no direction has worked. This could be a case where
the two CS of the exit point are stoichiometric, so there is no ZPF line leading
to the next node
So we take a step from the normal of the exit away from the third CS of the invariant
and add a new starting point. We also add the current exit as a parent, so we don't
search in that direction again
"""
free_point = map_utils._generate_point_with_free_cs(point)
copy_conds = copy.deepcopy(free_point.global_conditions)
# Move by small amount (1e-3 seem to be a good value, too small, and we may fail to detect a possible third phase and too large, and we may step over a potential node)
for av, norm_dir in zip(self.axis_vars, normal):
copy_conds[av] += 1e-3*norm_dir*direction.value
_log.info(f"Attemping to add point at {copy_conds}")
new_point = point_from_equilibrium(self.dbf, self.components, self.phases, copy_conds, models=self.models, phase_record_factory=self.phase_records)
if new_point is not None:
# If new point is an invariant, then we add it as a starting point (point is added as a parent to prevent repeated exit calculations)
# If new point is 2 phase, then we add it as a starting point the node being the exit
_log.info(f"Adding starting point: {new_point.stable_phases}")
success = False
if len(new_point.stable_composition_sets) == 3:
new_node = self._create_node_from_point(new_point, point, None, None)
success = self.node_queue.add_node(new_node)
elif len(new_point.stable_composition_sets) == 2:
new_node = self._create_node_from_point(new_point, None, None, None, ExitHint.POINT_IS_EXIT)
success = self.node_queue.add_node(new_node)
else:
_log.info("Point could not be starting point. Needs 2 or 3 phases")
if not success:
_log.info("Point has already been added")
else:
_log.info("Could not find a new node from conditions")
def _process_new_node(self, zpf_line: ZPFLine, new_node: Node):
"""
Post global min check after finding node to ensure it's not a metastable node
This is the same as _process_new_node in the BinaryStrategy
"""
_log.info("Checking if new node is metastable")
cs_result = zeq._find_global_min_cs(new_node, system_info=self.system_info, pdens=self.GLOBAL_MIN_PDENS, tol=self.GLOBAL_MIN_TOL, num_candidates=self.GLOBAL_MIN_NUM_CANDIDATES)
if cs_result is None:
_log.info("Global eq check on new node passed")
super()._process_new_node(zpf_line, new_node)
else:
_log.info("Global eq check failed. New node is metastable. Removing current zpf line.")
self.zpf_lines.pop(-1)
[docs]
def get_invariant_data(self, x: v.StateVariable, y: v.StateVariable, global_x: bool = False, global_y: bool = False) -> list[PhaseRegionData]:
"""
Create a dictionary of data for invariant plotting.
Parameters
----------
x : v.StateVariable
The state variable to be used for the x-axis.
y : v.StateVariable
The state variable to be used for the y-axis.
global_x : bool
Whether variable x applies to the global system
global_y : bool
Whether variable y applies to the global system
Returns
-------
list of StrategyData pertaining to each invariant
"""
return get_invariant_data_from_tieline_strategy(self, x, y, global_x, global_y)
[docs]
def get_tieline_data(self, x: v.StateVariable, y: v.StateVariable, global_x: bool = False, global_y: bool = False) -> list[PhaseRegionData]:
"""
Create a dictionary of data for plotting ZPF lines.
Parameters
----------
x : v.StateVariable
The state variable to be used for the x-axis.
y : v.StateVariable
The state variable to be used for the y-axis.
global_x : bool
Whether variable x applies to the global system
global_y : bool
Whether variable y applies to the global system
Returns
-------
list of Strategy pertaining to each tieline
"""
return get_tieline_data_from_tieline_strategy(self, x, y, global_x, global_y)