The tdb module provides support for reading and writing databases in
Thermo-Calc TDB format.
from pyparsing import CaselessKeyword, CharsNotIn, Group
from pyparsing import LineEnd, MatchFirst, OneOrMore, Optional, SkipTo
from pyparsing import ZeroOrMore, Suppress, White, Word, alphanums, alphas, nums
from pyparsing import delimitedList, ParseException
import re
from symengine.lib.symengine_wrapper import UniversalSet, Union, Complement
from symengine import sympify, And, Or, Not, EmptySet, Interval, Piecewise, Add, Mul, Pow
from symengine import Float, Symbol, LessThan, StrictLessThan, S, E
from tinydb import where
from pycalphad import Database
from pycalphad.io.database import DatabaseExportError
from pycalphad.io.grammar import float_number, chemical_formula
from pycalphad.variables import Species
import pycalphad.variables as v
from pycalphad.io.tdb_keywords import expand_keyword, TDB_PARAM_TYPES
from pycalphad.core.utils import generate_symmetric_group
from collections import defaultdict, namedtuple
import ast
import sys
import inspect
import functools
import itertools
import getpass
import datetime
import warnings
import hashlib
from copy import deepcopy
_AST_WHITELIST = [ast.Add, ast.BinOp, ast.Call, ast.Constant, ast.Div,
ast.Expression, ast.Load, ast.Mult, ast.Name,
ast.Pow, ast.Sub, ast.UAdd, ast.UnaryOp, ast.USub]
def _sympify_string(math_string):
"Convert math string into SymEngine object."
# drop pound symbols ('#') since they denote function names
# we detect those automatically
expr_string = math_string.replace('#', '')
# sympify doesn't recognize LN as ln()
expr_string = \
re.sub(r'(?<!\w)LN(?!\w)', 'ln', expr_string, flags=re.IGNORECASE)
expr_string = \
re.sub(r'(?<!\w)LOG(?!\w)', 'log', expr_string, flags=re.IGNORECASE)
expr_string = \
re.sub(r'(?<!\w)EXP(?!\w)', 'exp', expr_string,
# sympify uses eval, so we need to sanitize the input
nodes = ast.parse(expr_string)
nodes = ast.Expression(nodes.body[0].value)
for node in ast.walk(nodes):
if type(node) not in _AST_WHITELIST: #pylint: disable=W1504
raise ValueError('Expression from TDB file not in whitelist: '
return sympify(expr_string).xreplace(get_supported_variables()).n()
def _parse_action(func):
Decorator for pyparsing parse actions to ease debugging.
pyparsing uses trial & error to deduce the number of arguments a parse
action accepts. Unfortunately any ``TypeError`` raised by a parse action
confuses that mechanism.
This decorator replaces the trial & error mechanism with one based on
reflection. If the decorated function itself raises a ``TypeError`` then
that exception is re-raised if the wrapper is called with less arguments
than required. This makes sure that the actual ``TypeError`` bubbles up
from the call to the parse action (instead of the one caused by pyparsing's
trial & error).
Modified slightly from the original for Py3 compatibility
Source: Florian Brucker on StackOverflow
func_items = inspect.signature(func).parameters.items()
func_args = [name for name, param in func_items
if param.kind == param.POSITIONAL_OR_KEYWORD]
num_args = len(func_args)
if num_args > 3:
raise ValueError('Input function must take at most 3 parameters.')
def action(*args):
"Wrapped function."
if len(args) < num_args:
if action.exc_info:
raise action.exc_info[0](action.exc_info[1], action.exc_info[2])
action.exc_info = None
return func(*args[:-(num_args + 1):-1])
except TypeError as err:
action.exc_info = sys.exc_info()
raise err
action.exc_info = None
return action
def _make_piecewise_ast(toks):
Convenience function for converting tokens into a piecewise symengine object.
cur_tok = 0
expr_cond_pairs = []
# Only one token: Not a piecewise function; just return the AST
if len(toks) == 1:
return _sympify_string(toks[0].strip(' ,'))
while cur_tok < len(toks)-1:
low_temp = toks[cur_tok]
high_temp = toks[cur_tok+2]
except IndexError:
# No temperature limit specified
high_temp = None
if high_temp is None:
And(low_temp <= v.T)
And(low_temp <= v.T, v.T < high_temp)
cur_tok = cur_tok + 2
expr_cond_pairs.append((0, True))
return Piecewise(*expr_cond_pairs)
class TCCommand(CaselessKeyword): #pylint: disable=R0903
Parser element for dealing with Thermo-Calc command abbreviations.
def parseImpl(self, instring, loc, doActions=True):
# Find the end of the keyword by searching for an end character
start = loc
endchars = ' ():,'
loc = -1
for charx in endchars:
locx = instring.find(charx, start)
if locx != -1:
# match the end-character closest to the start character
if loc != -1:
loc = min(loc, locx)
loc = locx
# if no end character found, just match the whole thing
if loc == -1:
loc = len(instring)
res = expand_keyword([self.match], instring[start:loc])
if len(res) > 1:
self.errmsg = '{0!r} is ambiguous: matches {1}' \
.format(instring[start:loc], res)
raise ParseException(instring, loc, self.errmsg, self)
# res[0] is the unambiguous expanded keyword
# in principle, res[0] == self.match
return loc, res[0]
except ValueError:
raise ParseException(instring, loc, self.errmsg, self)
def _tdb_grammar(): #pylint: disable=R0914
Convenience function for getting the pyparsing grammar of a TDB file.
int_number = Word(nums).setParseAction(lambda t: [int(t[0])])
# symbol name, e.g., phase name, function name
symbol_name = Word(alphanums+'_:', min=1)
ref_phase_name = symbol_name = Word(alphanums+'_-:()/', min=1)
# species name, e.g., CO2, AL, FE3+
species_name = Word(alphanums+'+-*/_.', min=1) + Optional(Suppress('%'))
reference_key = Word(alphanums+':_-')('reference_key')
# constituent arrays are colon-delimited
# each subarray can be comma- or space-delimited
constituent_array = Group(delimitedList(Group(OneOrMore(Optional(Suppress(',')) + species_name)), ':'))
param_types = MatchFirst([TCCommand(param_type) for param_type in TDB_PARAM_TYPES])
# Let symengine do heavy arithmetic / algebra parsing for us
# a convenience function will handle the piecewise details
func_expr = (float_number | ZeroOrMore(',').setParseAction(lambda t: 0.01)) + OneOrMore(SkipTo(';') \
+ Suppress(';') + ZeroOrMore(Suppress(',')) + Optional(float_number) + \
Suppress(Optional(Word('Yy', exact=1))), stopOn=Word('Nn', exact=1)) + Suppress(Optional(Word('Nn', exact=1)))
cmd_element = TCCommand('ELEMENT') + Word(alphas+'/-', min=1, max=2) + ref_phase_name + \
float_number + float_number + float_number + LineEnd()
cmd_species = TCCommand('SPECIES') + species_name + chemical_formula + LineEnd()
cmd_typedef = TCCommand('TYPE_DEFINITION') + \
Suppress(White()) + CharsNotIn(' !', exact=1) + SkipTo(LineEnd())
cmd_function = TCCommand('FUNCTION') + symbol_name + \
func_expr.setParseAction(_make_piecewise_ast) + \
Optional(Suppress(reference_key)) + LineEnd()
cmd_ass_sys = TCCommand('ASSESSED_SYSTEMS') + SkipTo(LineEnd())
cmd_defsysdef = TCCommand('DEFINE_SYSTEM_DEFAULT') + SkipTo(LineEnd())
cmd_defcmd = TCCommand('DEFAULT_COMMAND') + SkipTo(LineEnd())
cmd_database_info = TCCommand('DATABASE_INFO') + SkipTo(LineEnd())
cmd_version_date = TCCommand('VERSION_DATE') + SkipTo(LineEnd())
cmd_reference_file = TCCommand('REFERENCE_FILE') + SkipTo(LineEnd())
cmd_add_ref = TCCommand('ADD_REFERENCES') + SkipTo(LineEnd())
cmd_lor = TCCommand('LIST_OF_REFERENCES') + SkipTo(LineEnd())
cmd_templim = TCCommand('TEMPERATURE_LIMITS') + SkipTo(LineEnd())
cmd_phase = TCCommand('PHASE') + symbol_name + \
Suppress(White()) + CharsNotIn(' !', min=1) + Suppress(White()) + \
Suppress(int_number) + Group(OneOrMore(float_number)) + \
cmd_constituent = TCCommand('CONSTITUENT') + symbol_name + \
Suppress(White()) + Suppress(':') + constituent_array + \
Suppress(':') + LineEnd()
cmd_parameter = TCCommand('PARAMETER') + param_types + \
Suppress('(') + symbol_name + \
Optional(Suppress('&') + Word(alphas+'/-', min=1, max=2), default=None) + \
Suppress(',') + constituent_array + \
Optional(Suppress(';') + int_number, default=0) + \
Suppress(')') + func_expr.setParseAction(_make_piecewise_ast) + \
Optional(Suppress(reference_key)) + LineEnd()
cmd_zerovolume = TCCommand('ZEROVOLUME_SPECIES') + SkipTo(LineEnd())
cmd_diffusion = TCCommand('DIFFUSION') + SkipTo(LineEnd())
# Now combine the grammar together
all_commands = cmd_element | \
cmd_species | \
cmd_typedef | \
cmd_function | \
cmd_ass_sys | \
cmd_defsysdef | \
cmd_defcmd | \
cmd_database_info | \
cmd_version_date | \
cmd_reference_file | \
cmd_add_ref | \
cmd_lor | \
cmd_templim | \
cmd_phase | \
cmd_constituent | \
cmd_parameter | \
cmd_zerovolume | \
return all_commands
def _process_typedef(targetdb, typechar, line):
Process a TYPE_DEFINITION command.
Assumes all phases are entered into the database already and that the
database defines _typechar_map, which defines a map of typechar to the
phases that use it. Any phases that in the typechar dict for this will have
the model_hints updated based on this type definition, regardless of which
phase names may be defined in this TYPE_DEF line.
matching_phases = targetdb._typechar_map[typechar]
del targetdb._typechar_map[typechar]
tokens = line.replace(',', '').split()
if len(tokens) < 4:
#Don't process IF-THEN type definitions for now
if 'IF' in tokens or 'THEN' in tokens:
warnings.warn("Type definitions using IF/THEN logic is not supported")
keyword = expand_keyword(['DISORDERED_PART', 'MAGNETIC'], tokens[3].upper())[0]
if len(keyword) == 0:
raise ValueError('Unknown type definition keyword: {}'.format(tokens[3]))
if len(matching_phases) == 0:
warnings.warn(f"The type definition character `{typechar}` in `TYPE_DEFINITION {typechar} {line}` is not used by any phase.")
if keyword == 'MAGNETIC':
# Magnetic model, both IHJ and Xiong models use these model hints when
# constructing Model instances, despite being prefixed `ihj_magnetic_`
model_hints = {
'ihj_magnetic_afm_factor': float(tokens[4]),
'ihj_magnetic_structure_factor': float(tokens[5])
for phase_name in matching_phases:
if keyword == 'DISORDERED_PART':
# order-disorder model: since we need to add model_hints to both the
# ordered and disorderd phase, we special case to update the phase
# names defined by the TYPE_DEF, rather than the updating the phases
# with matching typechars.
ordered_phase = tokens[2].upper()
disordered_phase = tokens[4].upper()
hint = {
'ordered_phase': ordered_phase,
'disordered_phase': disordered_phase,
if ordered_phase in targetdb.phases:
raise ValueError(f"The {ordered_phase} phase is not in the database, but is defined by: `TYPE_DEFINTION {typechar} {line}`")
if disordered_phase in targetdb.phases:
raise ValueError(f"The {disordered_phase} phase is not in the database, but is defined by: `TYPE_DEFINTION {typechar} {line}`")
phase_options = {'ionic_liquid_2SL': 'Y',
'symmetry_FCC_4SL': 'F',
'symmetry_BCC_4SL': 'B',
'liquid': 'L',
'gas': 'G',
'aqueous': 'A',
'charged_phase': 'I'}
inv_phase_options = dict([reversed(i) for i in phase_options.items()])
def _process_phase(targetdb, name, typedefs, subls):
Process the PHASE command.
splitname = name.split(':')
phase_name = splitname[0].upper()
options = ''
if len(splitname) > 1:
options = splitname[1]
targetdb.add_structure_entry(phase_name, phase_name)
model_hints = {}
for option in inv_phase_options.keys():
if option in options:
model_hints[inv_phase_options[option]] = True
for typedef_char in list(typedefs):
# Model hints are updated later based on the type definitions
targetdb.add_phase(phase_name, model_hints, subls)
def _process_parameter(targetdb, param_type, phase_name, diffusing_species,
constituent_array, param_order, param, ref=None):
Process the PARAMETER command.
# sorting lx is _required_ here: see issue #17 on GitHub
targetdb.add_parameter(param_type, phase_name.upper(),
[[c.upper() for c in sorted(lx)]
for lx in constituent_array.asList()],
param_order, param, ref, diffusing_species, force_insert=False)
def _unimplemented(*args, **kwargs): #pylint: disable=W0613
Null function.
def _process_species(db, sp_name, sp_comp, charge=0, *args):
"""Add a species to the Database. If charge not specified, the Species will be neutral."""
# process the species composition list of [element1, ratio1, element2, ratio2, ..., elementN, ratioN]
constituents = {sp_comp[i]: sp_comp[i+1] for i in range(0, len(sp_comp), 2)}
db.species.add(Species(sp_name, constituents, charge=charge))
# g/mol
_molmass = \
{"H": 1.008, "HE": 4.003, "LI": 6.941, "BE": 9.012, "B": 10.811, "C": 12.011, "N": 14.007, "O": 15.999,
"F": 18.998, "NE": 20.18, "NA": 22.99, "MG": 24.305, "AL": 26.982, "SI": 28.086, "P": 30.974, "S": 32.065,
"CL": 35.453, "AR": 39.948, "K": 39.098, "CA": 40.078, "SC": 44.956, "TI": 47.867, "V": 50.942, "CR": 51.996,
"MN": 54.938, "FE": 55.845, "CO": 58.933, "NI": 58.693, "CU": 63.546, "ZN": 65.39, "GA": 69.723, "GE": 72.64,
"AS": 74.922, "SE": 78.96, "BR": 79.904, "KR": 83.8, "RB": 85.468, "SR": 87.62, "Y": 88.906, "ZR": 91.224,
"NB": 92.906, "MO": 95.94, "TC": 98, "RU": 101.07, "RH": 102.906, "PD": 106.42, "AG": 107.868, "CD": 112.411,
"IN": 114.818, "SN": 118.71, "SB": 121.76, "TE": 127.6, "I": 126.905, "XE": 131.293, "CS": 132.906,
"BA": 137.327, "LA": 138.906, "CE": 140.116, "PR": 140.908, "ND": 144.24, "PM": 145, "SM": 150.36,
"EU": 151.964, "GD": 157.25, "TB": 158.925, "DY": 162.5, "HO": 164.93, "ER": 167.259, "TM": 168.934,
"YB": 173.04, "LU": 174.967, "HF": 178.49, "TA": 180.948, "W": 183.84, "RE": 186.207, "OS": 190.23,
"IR": 192.217, "PT": 195.078, "AU": 196.967, "HG": 200.59, "TL": 204.383, "PB": 207.2, "BI": 208.98,
"PO": 209, "AT": 210, "RN": 222, "FR": 223, "RA": 226, "AC": 227, "TH": 232.038, "PA": 231.036, "U": 238.029,
"NP": 237, "PU": 244, "AM": 243, "CM": 247, "BK": 247, "CF": 251, "ES": 252, "FM": 257, "MD": 258, "NO": 259,
"LR": 262, "RF": 261, "DB": 262, "SG": 266, "BH": 264, "HS": 277, "MT": 268
def _process_reference_state(db, el, refphase, mass, H298, S298):
# If user database doesn't specify mass, use periodic table values (if the element exists)
db.refstates[el] = {
'phase': refphase,
'mass': mass if mass != 0.0 else _molmass.get(el, 0.0),
'H298': H298,
'S298': S298,
def _setitem_raise_duplicates(dictionary, key, value):
if key in dictionary:
raise ValueError("TDB contains duplicate FUNCTION {}".format(key))
dictionary[key] = value
'ELEMENT': lambda db, el, ref_phase, mass, h, s: (db.elements.add(el), _process_reference_state(db, el, ref_phase, mass, h, s), _process_species(db, el, [el, 1], 0)),
'SPECIES': _process_species,
'TYPE_DEFINITION': lambda db, typechar, line: db._typedefs_queue.append((typechar, line)),
'FUNCTION': lambda db, name, sym: _setitem_raise_duplicates(db.symbols, name, sym),
'DEFINE_SYSTEM_DEFAULT': _unimplemented,
'ASSESSED_SYSTEMS': _unimplemented,
'DEFAULT_COMMAND': _unimplemented,
'DATABASE_INFO': _unimplemented,
'VERSION_DATE': _unimplemented,
'REFERENCE_FILE': _unimplemented,
'ADD_REFERENCES': _unimplemented,
'LIST_OF_REFERENCES': _unimplemented,
'TEMPERATURE_LIMITS': _unimplemented,
'PHASE': _process_phase,
lambda db, name, c: db.add_phase_constituents(
name.split(':')[0].upper(), c),
'PARAMETER': _process_parameter,
'ZEROVOLUME_SPECIES': _unimplemented,
'DIFFUSION': _unimplemented,
def to_interval(relational):
if isinstance(relational, And):
result = UniversalSet()
for i in relational.args:
result = result.intersection(to_interval(i))
return result
elif isinstance(relational, Or):
return Union(*[to_interval(i) for i in relational.args])
elif isinstance(relational, Not):
return Complement(*[to_interval(i) for i in relational.args])
if relational == S.true:
return Interval(S.NegativeInfinity, S.Infinity, left_open=True, right_open=True)
if len(relational.free_symbols) != 1:
raise ValueError(f'Relational must only have one free symbol. Got {len(relational.free_symbols)} ({relational.free_symbols}) for relational {relational}')
if len(relational.args) != 2:
raise ValueError(f'Relational must only have two arguments. Got {len(relational.args)} ({relational.args}) for relational {relational}')
free_symbol = list(relational.free_symbols)[0]
lhs = relational.args[0]
rhs = relational.args[1]
if isinstance(relational, LessThan):
if rhs == free_symbol:
return Interval(lhs, S.Infinity, left_open=False, right_open=True)
return Interval(S.NegativeInfinity, rhs, left_open=True, right_open=False)
elif isinstance(relational, StrictLessThan):
if rhs == free_symbol:
return Interval(lhs, S.Infinity, left_open=True, right_open=False)
return Interval(S.NegativeInfinity, rhs, left_open=False, right_open=True)
raise ValueError('Unsupported Relational: {}'.format(relational.__class__.__name__))
class TCPrinter(object):
Prints Thermo-Calc style function expressions.
def __init__(self, if_incompatible='warn'):
if if_incompatible not in ('warn', 'raise', 'ignore', 'fix'):
raise ValueError('Incorrect options passed to \'if_incompatible\'. Valid args are \'raise\', \'warn\', or \'fix\'.')
self.if_incompatible = if_incompatible
def doprint(self, expr):
return self._print_Piecewise(expr)
def _stringify_expr(self, expr):
if isinstance(expr, Add):
terms = self._stringify_expr(expr.args[0])
for arg in expr.args[1:]:
adding_term = self._stringify_expr(arg)
if adding_term[0] == '-':
terms += adding_term
terms += '+' + adding_term
return terms
elif isinstance(expr, Mul):
#For cases like: A*(B+C) where an Add object is one of the arguments in Mul
#We want to explicitly add the parenthesis around Add(B,C)
#and save it to A*(B+C) rather than A*B+C
#For other types of expr, this shouldn't be an issue since:
# Mul should not need extra parenthesis, e.g. A*(B*C) -> A*B*C
# Pow will explicitly add parenthesis (in the next elif block)
# Other functions such as Log, Sin, etc should
# include the parenthesis when converting to string
#All the arguments in Mul should be tested and they're all combined to a single expression by '*'
# So we could stringify each argument as a list and join them together is '*'
term_list = ['(' + self._stringify_expr(arg) + ')' if isinstance(arg,Add) else self._stringify_expr(arg) for arg in expr.args]
terms = '*'.join(term_list)
return terms
elif isinstance(expr, Pow):
if expr.args[0] == E:
# This is the exponential function
terms = 'exp(' + self._stringify_expr(expr.args[1]) + ')'
argument = self._stringify_expr(expr.args[0])
if isinstance(expr.args[0], (Add, Mul, Pow)):
argument = '( ' + argument + ' )'
# Try coercing the exponent to an int (TC-compatible) or float
exponent = float(expr.args[1])
if exponent == int(exponent):
exponent = int(exponent)
if self.if_incompatible in ('fix', 'raise'):
raise DatabaseExportError(f'Non-integer exponents cannot be represented in TDB compatibility mode. Got: {expr}')
elif self.if_incompatible == 'warn':
warnings.warn(f'Ignoring that non-integer exponents cannot be represented in TDB compatibility mode. Got: {expr}')
except (TypeError, RuntimeError):
if self.if_incompatible in ('fix', 'raise'):
raise DatabaseExportError(f'Non-integer exponents cannot be represented in TDB compatibility mode. Got: {expr}')
elif self.if_incompatible == 'warn':
warnings.warn(f'Ignoring that non-integer exponents cannot be represented in TDB compatibility mode. Got: {expr}')
exponent = expr.args[1]
terms = argument + '**' + '(' + self._stringify_expr(exponent) + ')'
return terms
return str(expr)
def _print_Piecewise(self, expr):
# Filter out default zeros since they are implicit in a TDB
filtered_args = [(x, cond) for x, cond in zip(*[iter(expr.args)]*2) if not ((cond == S.true) and (x == S.Zero))]
exprs = [self._stringify_expr(x) for x, cond in filtered_args]
# Only a small subset of piecewise functions can be represented
# Need to verify that each cond's highlim equals the next cond's lowlim
# to_interval() is used because symengine does not implement as_set()
intervals = [to_interval(cond) for x, cond in filtered_args]
intersected_intervals = UniversalSet()
for i in intervals:
intersected_intervals = intersected_intervals.intersection(i)
if (len(intervals) > 1) and (intersected_intervals != EmptySet):
raise ValueError('Overlapping intervals cannot be represented: {}'.format(intervals))
continuous_interval = Interval(intervals[0].args[0], intervals[-1].args[1], False, True)
# XXX: Wait, should this be the intersection or the union of continuous_interval?
if Union(*intervals).union(continuous_interval) != continuous_interval:
raise ValueError('Piecewise intervals must be continuous')
if not all([cond.free_symbols == {v.T} for x, cond in filtered_args]):
raise ValueError('Only temperature-dependent piecewise conditions are supported')
# Sort expressions based on intervals
sortindices = [i[0] for i in sorted(enumerate(intervals), key=lambda x:x[1].args[0])]
exprs = [exprs[idx] for idx in sortindices]
# Infinity is implicit in TDB, so we shouldn't print it; ',' means use default value
as_str = lambda x: ',' if (x == S.Infinity) or (x == S.NegativeInfinity) else str(x)
if len(exprs) > 1:
result = '{1} {0}; {2} Y'.format(exprs[0], as_str(intervals[0].args[0]),
result += 'Y'.join([' {0}; {1} '.format(expr,
as_str(i.args[1])) for i, expr in zip(intervals[1:], exprs[1:])])
result += 'N'
result = '{0} {1}; {2} N'.format(as_str(intervals[0].args[0]), exprs[0],
return result
def reflow_text(text, linewidth=80):
Add line breaks to ensure text doesn't exceed a certain line width.
text : str
linewidth : int, optional
reflowed_text : str
lines = text.split("\n")
linebreak_chars = [" ", "$"]
output_lines = []
for line in lines:
if len(line) <= linewidth:
while len(line) > linewidth:
linebreak_idx = linewidth - 1
# Makes expressions breakable at `linebreak_chars` plus at [+-][0-9], since they are not preceded by 'E' or '('. This should prevent breaking lines inside EXP or LN/LOG or power expression: '6.14599E-07', 'T**(-3)', and 'LOG(-3)'
while linebreak_idx > 0 and not re.match(r'[^E\(][+-][0-9]', line[linebreak_idx-1:linebreak_idx+2]) and line[linebreak_idx] not in linebreak_chars:
linebreak_idx -= 1
# Need to check 2 (rather than zero) because we prepend newlines with 2 characters
if linebreak_idx <= 2:
raise ValueError(f"Unable to reflow the following line of length {len(line)} below the maximum length of {linewidth}: \n{line}")
if "$" in line:
# previous line was a comment
line = "$ " + line[linebreak_idx:]
# Always put some leading spaces at the start of a new line
# Otherwise TC may misunderstand the expression
line = " " + line[linebreak_idx:]
return "\n".join(output_lines)
def _apply_new_symbol_names(dbf, symbol_name_map):
Push changes in symbol names through the SymEngine expressions in symbols and parameters
dbf : Database
A pycalphad Database.
symbol_name_map : dict
Map of {old_symbol_name: new_symbol_name}
# first apply the rename to the keys
dbf.symbols = {symbol_name_map.get(name, name): expr for name, expr in dbf.symbols.items()}
# then propagate through to the symbol SymEngine expression values
dbf.symbols = {name: S(expr).xreplace({Symbol(s): Symbol(v) for s, v in symbol_name_map.items()}) for name, expr in dbf.symbols.items()}
# finally propagate through to the parameters
for p in dbf._parameters.all():
dbf._parameters.update({'parameter': S(p['parameter']).xreplace({Symbol(s): Symbol(v) for s, v in symbol_name_map.items()})}, doc_ids=[p.doc_id])
# Keys should correspond to the model hints added via the `phase_options` dict
"symmetry_FCC_4SL": [[0, 1, 2, 3]],
"symmetry_BCC_4SL": [[0, 1], [2, 3]],
def add_phase_symmetry_ordering_parameters(dbf):
for phase_name, phase_obj in dbf.phases.items():
if phase_obj.model_hints.get("ordered_phase", "") == phase_name:
for symmetry_hint, symmetry in KNOWN_SUBLATTICE_SYMMETRY_RELATIONS.items():
if phase_obj.model_hints.get(symmetry_hint, False):
for param in dbf.search(where("phase_name") == phase_name):
const_array = param["constituent_array"]
for symm_unique_const_array in set(generate_symmetric_group(const_array, symmetry)) - {const_array}:
new_param = {key: val for key, val in param.items()}
new_param["constituent_array"] = symm_unique_const_array
new_param["_generated_by_symmetry_option"] = True # flag to be able to remove it later and preserve the phase option
def _symmetry_added_parameter(dbf, param):
Return true if parameter belongs to a phase with an active symmetry
option and the parameter was added by a symmetry option.
phase_obj = dbf.phases.get(param["phase_name"])
if phase_obj is None:
# Phase isn't in the database at all, so it's impossible for this parameter
# to get added by symmetry
return False
for symm_hint in set(KNOWN_SUBLATTICE_SYMMETRY_RELATIONS.keys()).intersection(phase_obj.model_hints.keys()):
if phase_obj.model_hints[symm_hint] and param.get("_generated_by_symmetry_option", False):
return True
return False
def get_supported_variables():
"When loading databases, these symbols should be replaced by their IndependentPotential counterparts"
return {Symbol(x): getattr(v, x) for x in v.__dict__ if isinstance(getattr(v, x), (v.IndependentPotential, Float))}
def write_tdb(dbf, fd, groupby='subsystem', if_incompatible='warn'):
Write a TDB file from a pycalphad Database object.
The goal is to produce TDBs that conform to the most restrictive subset of database specifications. Some of these
can be adjusted for automatically, such as the Thermo-Calc line length limit of 78. Others require changing the
database in non-trivial ways, such as the maximum length of function names (8). The default is to warn the user when
attempting to write an incompatible database and the user must choose whether to warn and write the file anyway or
to fix the incompatibility.
Currently the supported compatibility fixes are:
- Line length <= 78 characters (Thermo-Calc)
- Function names <= 8 characters (Thermo-Calc)
The current unsupported fixes include:
- Keyword length <= 2000 characters (Thermo-Calc)
- Element names <= 2 characters (Thermo-Calc)
- Phase names <= 24 characters (Thermo-Calc)
Other TDB compatibility issues required by Thermo-Calc or other software should be reported to the issue tracker.
dbf : Database
A pycalphad Database.
fd : file-like
File descriptor.
groupby : ['subsystem', 'phase'], optional
Desired grouping of parameters in the file.
if_incompatible : string, optional ['raise', 'warn', 'fix']
Strategy if the database does not conform to the most restrictive database specification.
The 'warn' option (default) will write out the incompatible database with a warning.
The 'raise' option will raise a DatabaseExportError.
The 'ignore' option will write out the incompatible database silently.
The 'fix' option will rectify the incompatibilities e.g. through name mangling.
# Before writing anything, check that the TDB is valid and take the appropriate action if not
if if_incompatible not in ('warn', 'raise', 'ignore', 'fix'):
raise ValueError('Incorrect options passed to \'if_incompatible\'. Valid args are \'raise\', \'warn\', or \'fix\'.')
# Handle function names > 8 characters
long_function_names = {k for k in dbf.symbols.keys() if len(k) > 8}
if len(long_function_names) > 0:
if if_incompatible == 'raise':
raise DatabaseExportError('The following function names are beyond the 8 character TDB limit: {}. Use the keyword argument \'if_incompatible\' to control this behavior.'.format(long_function_names))
elif if_incompatible == 'fix':
# if we are going to make changes, make the changes to a copy and leave the original object untouched
dbf = deepcopy(dbf) # TODO: if we do multiple fixes, we should only copy once
symbol_name_map = {}
for name in long_function_names:
hashed_name = 'F' + str(hashlib.md5(name.encode('UTF-8')).hexdigest()).upper()[:7] # this is implictly upper(), but it is explicit here
symbol_name_map[name] = hashed_name
_apply_new_symbol_names(dbf, symbol_name_map)
elif if_incompatible == 'warn':
warnings.warn('Ignoring that the following function names are beyond the 8 character TDB limit: {}. Use the keyword argument \'if_incompatible\' to control this behavior.'.format(long_function_names))
# Begin constructing the written database
writetime = datetime.datetime.now()
maxlen = 78
output = ""
# Comment header block
# Import here to prevent circular imports
from pycalphad import __version__
# getuser() will raise on Windows if it can't find a username: https://bugs.python.org/issue32731
username = getpass.getuser()
# if we can't find a good username, just choose a default and move on
username = 'user'
output += ("$" * maxlen) + "\n"
output += "$ Date: {}\n".format(writetime.strftime("%Y-%m-%d %H:%M"))
output += "$ Components: {}\n".format(', '.join(sorted(dbf.elements)))
output += "$ Phases: {}\n".format(', '.join(sorted(dbf.phases.keys())))
output += "$ Generated by {} (pycalphad {})\n".format(username, __version__)
output += ("$" * maxlen) + "\n\n"
for element in sorted(dbf.elements):
ref = dbf.refstates.get(element, {})
refphase = ref.get('phase', 'BLANK')
mass = ref.get('mass', 0.0)
H298 = ref.get('H298', 0.0)
S298 = ref.get('S298', 0.0)
output += "ELEMENT {0} {1} {2} {3} {4} !\n".format(element.upper(), refphase, mass, H298, S298)
if len(dbf.elements) > 0:
output += "\n"
for species in sorted(dbf.species, key=lambda s: s.name):
if species.name not in dbf.elements:
# construct the charge part of the specie
if species.charge != 0:
if species.charge >0:
charge_sign = '+'
charge_sign = ''
charge = '/{}{}'.format(charge_sign, species.charge)
charge = ''
species_constituents = ''.join(['{}{}'.format(el, val) for el, val in sorted(species.constituents.items(), key=lambda t: t[0])])
output += "SPECIES {0} {1}{2} !\n".format(species.name.upper(), species_constituents, charge)
if len(dbf.species) > 0:
output += "\n"
# Write FUNCTION block
for name, expr in sorted(dbf.symbols.items()):
if not isinstance(expr, Piecewise):
# Non-piecewise exprs need to be wrapped to print
# Otherwise TC's TDB parser will complain
expr = Piecewise((expr, And(v.T >= 1, v.T < 10000)))
expr = TCPrinter().doprint(expr).upper()
if ';' not in expr:
expr += '; N'
output += "FUNCTION {0} {1} !\n".format(name.upper(), expr)
output += "\n"
# Boilerplate code
output += "TYPE_DEFINITION % SEQ * !\n"
default_elements = [i.upper() for i in sorted(dbf.elements) if i.upper() == 'VA' or i.upper() == '/-']
if len(default_elements) > 0:
output += 'DEFAULT_COMMAND DEFINE_SYSTEM_ELEMENT {} !\n'.format(' '.join(default_elements))
output += "\n"
typedef_chars = list("^&*()'ABCDEFGHIJKLMNOPQSRTUVWXYZ")[::-1]
# Write necessary TYPE_DEF based on model hints
typedefs = defaultdict(lambda: ["%"])
for name, phase_obj in sorted(dbf.phases.items()):
model_hints = phase_obj.model_hints.copy()
possible_options = set(phase_options.keys()).intersection(model_hints)
# Phase options are handled later
for option in possible_options:
del model_hints[option]
if ('ordered_phase' in model_hints.keys()) and (model_hints['ordered_phase'] == name):
new_char = typedef_chars.pop()
.format(new_char, model_hints['ordered_phase'].upper(),
del model_hints['ordered_phase']
del model_hints['disordered_phase']
if ('disordered_phase' in model_hints.keys()) and (model_hints['disordered_phase'] == name):
# We handle adding the correct typedef when we write the ordered phase
del model_hints['ordered_phase']
del model_hints['disordered_phase']
if 'ihj_magnetic_afm_factor' in model_hints.keys():
new_char = typedef_chars.pop()
.format(new_char, name.upper(), model_hints['ihj_magnetic_afm_factor'],
del model_hints['ihj_magnetic_afm_factor']
del model_hints['ihj_magnetic_structure_factor']
if len(model_hints) > 0:
# Some model hints were not properly consumed
raise ValueError('Not all model hints are supported: {}'.format(model_hints))
# Perform a second loop now that all typedefs / model hints are consistent
for name, phase_obj in sorted(dbf.phases.items()):
# model_hints may also contain "phase options", e.g., ionic liquid
model_hints = phase_obj.model_hints.copy()
name_with_options = str(name.upper())
possible_options = set(phase_options.keys()).intersection(model_hints.keys())
if len(possible_options) > 0:
name_with_options += ':'
for option in possible_options:
name_with_options += phase_options[option]
output += "PHASE {0} {1} {2} {3} !\n".format(name_with_options, ''.join(typedefs[name]),
' '.join([str(i) for i in phase_obj.sublattices]))
constituents = ':'.join([', '.join([spec.name for spec in sorted(subl)]) for subl in phase_obj.constituents])
output += "CONSTITUENT {0} :{1}: !\n".format(name_with_options, constituents)
output += "\n"
# PARAMETERs by subsystem
param_sorted = defaultdict(lambda: list())
paramtuple = namedtuple('ParamTuple', ['phase_name', 'parameter_type', 'complexity', 'constituent_array',
'parameter_order', 'diffusing_species', 'parameter', 'reference'])
for param in dbf._parameters.all():
if _symmetry_added_parameter(dbf, param):
continue # skip this parameter
if groupby == 'subsystem':
components = set()
for subl in param['constituent_array']:
components |= set(subl)
if param['diffusing_species'] != Species(None):
components |= {param['diffusing_species']}
# Wildcard operator is not a component
components -= {'*'}
desired_active_pure_elements = [list(x.constituents.keys()) for x in components]
components = set([el.upper() for constituents in desired_active_pure_elements for el in constituents])
# Remove vacancy if it's not the only component (pure vacancy endmember)
if len(components) > 1:
components -= {'VA'}
components = tuple(sorted([c.upper() for c in components]))
grouping = components
elif groupby == 'phase':
grouping = param['phase_name'].upper()
raise ValueError('Unknown groupby attribute \'{}\''.format(groupby))
# We use the complexity parameter to help with sorting the parameters logically
param_sorted[grouping].append(paramtuple(param['phase_name'], param['parameter_type'],
sum([len(i) for i in param['constituent_array']]),
param['constituent_array'], param['parameter_order'],
param['diffusing_species'], param['parameter'],
def write_parameter(param_to_write):
constituents = ':'.join([','.join(sorted([i.name.upper() for i in subl]))
for subl in param_to_write.constituent_array])
# TODO: Handle references
paramx = param_to_write.parameter
if not isinstance(paramx, Piecewise):
# Non-piecewise parameters need to be wrapped to print correctly
# Otherwise TC's TDB parser will fail
paramx = Piecewise((paramx, And(v.T >= 1, v.T < 10000)))
exprx = TCPrinter(if_incompatible=if_incompatible).doprint(paramx).upper()
if ';' not in exprx:
exprx += '; N'
if param_to_write.diffusing_species != Species(None):
ds = "&" + param_to_write.diffusing_species.name
ds = ""
return "PARAMETER {}({}{},{};{}) {} !\n".format(param_to_write.parameter_type.upper(),
if groupby == 'subsystem':
for num_species in range(1, 5):
subsystems = list(itertools.combinations(sorted([i.name.upper() for i in dbf.species]), num_species))
for subsystem in subsystems:
parameters = sorted(param_sorted[subsystem])
if len(parameters) > 0:
output += "\n\n"
output += "$" * maxlen + "\n"
output += "$ {}".format('-'.join(sorted(subsystem)).center(maxlen, " ")[2:-1]) + "$\n"
output += "$" * maxlen + "\n"
output += "\n"
for parameter in parameters:
output += write_parameter(parameter)
# Don't generate combinatorics for multi-component subsystems or we'll run out of memory
if len(dbf.species) > 4:
subsystems = [k for k in param_sorted.keys() if len(k) > 4]
for subsystem in subsystems:
parameters = sorted(param_sorted[subsystem])
for parameter in parameters:
output += write_parameter(parameter)
elif groupby == 'phase':
for phase_name in sorted(dbf.phases.keys()):
parameters = sorted(param_sorted[phase_name])
if len(parameters) > 0:
output += "\n\n"
output += "$" * maxlen + "\n"
output += "$ {}".format(phase_name.upper().center(maxlen, " ")[2:-1]) + "$\n"
output += "$" * maxlen + "\n"
output += "\n"
for parameter in parameters:
output += write_parameter(parameter)
raise ValueError('Unknown groupby attribute {}'.format(groupby))
# Reflow text to respect character limit per line
fd.write(reflow_text(output, linewidth=maxlen))
def read_tdb(dbf, fd):
Parse a TDB file into a pycalphad Database object.
dbf : Database
A pycalphad Database.
fd : file-like
File descriptor.
lines = fd.read().upper()
lines = lines.replace('\t', ' ')
# Split the string by newlines
splitlines = lines.split('\n')
# Remove comments
splitlines = [k.split('$', 1)[0] for k in splitlines]
# Remove everything after command delimiter, but keep the delimiter so we can split later
splitlines = [k.split('!')[0] + ('!' if len(k.split('!')) > 1 else '') for k in splitlines]
# Combine everything back together
lines = ' '.join(splitlines)
# Now split by the command delimeter
commands = lines.split('!')
# Temporarily track which typedef characters were used by which phase
# before we process the type definitions
# Map {typedef character: [phases using that typedef]}
dbf._typechar_map = defaultdict(list)
dbf._typedefs_queue = [] # queue of type defintion lines to process
grammar = _tdb_grammar()
char_idx = 0
for command in commands:
if len(command) == 0:
tokens = grammar.parseString(command)
_TDB_PROCESSOR[tokens[0]](dbf, *tokens[1:])
except ParseException as e:
context = e.line + '\n' + (" " * (e.column - 1) + "^")
# pyparsing is only given one line at a time, so we modify
# the exception metadata to correspond with the original input
err_char_idx = char_idx + (e.column - 1) # e.column is an index that starts at 1
joinedlines = "\n".join(splitlines)
e.pstr = joinedlines
e.loc = err_char_idx
# context variable includes a helpful cursor aligned with the 'error character'
# this requires being on a newline so it renders correctly in consoles
e.msg = f'Invalid TDB syntax.\n{context}'
raise e
# Add 1 for removed '!' delimiter
char_idx += len(command) + 1
# Process type definitions last, updating model_hints for defined phases.
for typechar, line in dbf._typedefs_queue:
_process_typedef(dbf, typechar, line)
# Raise warnings if there are any remaining type characters that one or more
# phases expected to be defined
for typechar, phases_expecting_typechar in dbf._typechar_map.items():
warnings.warn(f"The type definition character `{typechar}` was defined in the following phases: "
f"{phases_expecting_typechar}, but no corresponding TYPE_DEFINITION line was found in the TDB.")
del dbf._typechar_map
del dbf._typedefs_queue
# Add phase option B/F parameters
# Must occur after adding model hints and parameters
Database.register_format("tdb", read=read_tdb, write=write_tdb)