Source code for pycalphad.io.database

"""
The database module provides support for reading and writing data types
associated with structured thermodynamic/kinetic data.
"""
from tinydb import TinyDB
from tinydb.storages import MemoryStorage
from datetime import datetime
from collections import namedtuple
import os
from pycalphad.variables import Species
from pycalphad.core.cache import fhash
try:
    # Python 2
    from StringIO import StringIO
except ImportError:
    # Python 3
    from io import StringIO

# handle missing FileExistsError in Python2
try:
    FileExistsError = FileExistsError
except NameError:
    class FileExistsError(OSError):
        """Python 2 backported FileExistsError wrapping OSError"""
        pass

[docs]class DatabaseExportError(Exception): """Raised when a database cannot be written.""" pass
def _to_tuple(lst): "Convert nested list to nested tuple. Source: Martijn Pieters on StackOverflow" return tuple(_to_tuple(i) if isinstance(i, list) else i for i in lst)
[docs]class Phase(object): #pylint: disable=R0903 """ Phase in the database. Attributes ---------- name : string System-local name of the phase. constituents : tuple of frozenset Possible sublattice constituents (elements and/or species). sublattices : list Site ratios of sublattices. model_hints : dict Structured "hints" for a Model trying to read this phase. Hints for major constituents and typedefs (Thermo-Calc) go here. """ def __init__(self): self.name = None self.constituents = None self.sublattices = [] self.model_hints = {} def __eq__(self, other): if type(self) == type(other): return self.__dict__ == other.__dict__ else: return False def __ne__(self, other): return not self.__eq__(other) def __repr__(self): return 'Phase({0!r})'.format(self.__dict__) def __hash__(self): return hash((self.name, self.constituents, tuple(self.sublattices), tuple(sorted(_to_tuple(self.model_hints.items())))))
DatabaseFormat = namedtuple('DatabaseFormat', ['read', 'write']) format_registry = {}
[docs]class Database(object): #pylint: disable=R0902 """ Structured thermodynamic and/or kinetic data. Attributes ---------- elements : list List of elements in database. species : list List of species in database. phases : dict Phase objects indexed by their system-local name. symbols : dict SymPy objects indexed by their name (FUNCTIONs in Thermo-Calc). references : dict Reference objects indexed by their system-local identifier. Examples -------- >>> mydb = Database(open('crfeni_mie.tdb')) >>> mydb = Database('crfeni_mie.tdb') >>> f = StringIO(u'$a complete TDB file as a string\n') >>> mydb = Database(f) """ def __new__(cls, *args): if len(args) == 0: obj = super(Database, cls).__new__(cls, *args) # Should elements be rolled into a special case of species? obj.elements = set() obj.species = set() obj.phases = {} obj.refstates = {} obj._structure_dict = {} # System-local phase names to global IDs obj._parameters = TinyDB(storage=MemoryStorage) obj.symbols = {} obj.references = {} # Note: No public typedefs here (from TDB files) # Instead we put that information in the model_hint for phases return obj elif len(args) == 1: fname = args[0] # Backwards compatibility: assume TDB by default fmt = 'tdb' # Attempt to auto-detect the correct format based on the file extension try: path, ext = os.path.splitext(fname) if '.' in ext and ext[1:].lower() in format_registry: fmt = ext[1:].lower() except (AttributeError, TypeError): pass if hasattr(fname, 'read'): # File descriptor return cls.from_file(fname, fmt=fmt) elif fname.find('\n') == -1: # Single-line string; it's probably a filename return cls.from_file(fname, fmt=fmt) else: # Newlines found: probably a full database string return cls.from_string(fname, fmt=fmt) else: raise ValueError('Invalid number of parameters: '+len(args)) def __hash__(self): return fhash(self.__dict__) def __getstate__(self): pickle_dict = {} for key, value in self.__dict__.items(): if key == '_parameters': pickle_dict[key] = value.all() else: pickle_dict[key] = value return pickle_dict def __setstate__(self, state): for key, value in state.items(): if key == '_parameters': self._parameters = TinyDB(storage=MemoryStorage) self._parameters.insert_multiple(value) else: setattr(self, key, value)
[docs] @staticmethod def register_format(fmt, read=None, write=None): """ Add support for reading and/or writing the specified format. Parameters ---------- fmt: str Format. read : callable, optional Read function with arguments (Database, file_descriptor) write : callable, optional Write function with arguments (Database, file_descriptor) Examples -------- None yet. """ format_registry[fmt.lower()] = DatabaseFormat(read=read, write=write)
[docs] @staticmethod def from_file(fname, fmt=None): """ Create a Database from a file. Parameters ---------- fname: str or file-like File name/descriptor to read. fmt : str, optional File format. If not specified, an attempt at auto-detection is made. Returns ------- dbf : Database Database from file. Examples -------- None yet. """ if fmt is None: # Attempt to auto-detect the correct format based on the file extension try: path, ext = os.path.splitext(fname) except (AttributeError, TypeError): # fname isn't actually a path, so we don't know the correct format raise ValueError('\'fmt\' keyword argument must be specified when passing a file descriptor.') if '.' in ext and ext[1:].lower() in format_registry: fmt = ext[1:].lower() else: fmt = fmt.lower() if fmt not in format_registry or format_registry[fmt].read is None: supported_reads = [key for key, value in format_registry.items() if value.read is not None] raise NotImplementedError('Unsupported read format \'{0}\'. Supported formats: {1}'.format(fmt, supported_reads)) # Is it a file descriptor? if hasattr(fname, 'read'): fd = fname need_to_close = False else: # It's not file-like, so it's probably a filename need_to_close = True fd = open(fname, mode='r') try: dbf = Database() format_registry[fmt.lower()].read(dbf, fd) finally: # Close file descriptors created in this routine # Otherwise that's left up to the calling function if need_to_close: fd.close() return dbf
[docs] @classmethod def from_string(cls, data, **kwargs): """ Returns Database from a string in the specified format. This function is a wrapper for calling `from_file` with StringIO. Parameters ---------- data : str Raw database string in the specified format. kwargs : optional See keyword arguments for `from_file`. Returns ------- dbf : Database """ return cls.from_file(StringIO(data), **kwargs)
[docs] def to_file(self, fname, fmt=None, if_exists='raise', **write_kwargs): """ Write the Database to a file. Parameters ---------- fname: str or file-like File name/descriptor to write. fmt : str, optional File format. If not specified, an attempt at auto-detection is made. if_exists : string, optional ['raise', 'rename', 'overwrite'] Strategy if 'fname' already exists. The 'raise' option (default) will raise a FileExistsError. The 'rename' option will append the date/time to the filename. The 'overwrite' option will overwrite the file. This argument is ignored if 'fname' is file-like. write_kwargs : optional Keyword arguments to pass to write function. Examples -------- None yet. """ if fmt is None: # Attempt to auto-detect the correct format based on the file extension try: path, ext = os.path.splitext(fname) except (AttributeError, TypeError): # fname isn't actually a path, so we don't know the correct format raise ValueError('\'fmt\' keyword argument must be specified when passing a file descriptor.') if '.' in ext and ext[1:].lower() in format_registry: fmt = ext[1:].lower() else: fmt = fmt.lower() if fmt not in format_registry or format_registry[fmt].write is None: supported_writes = [key for key, value in format_registry.items() if value.write is not None] raise NotImplementedError('Unsupported write format \'{0}\'. Supported formats: {1}'.format(fmt, supported_writes)) # Is this a file descriptor? if hasattr(fname, 'write'): format_registry[fmt].write(self, fname, **write_kwargs) else: if os.path.exists(fname) and if_exists != 'overwrite': if if_exists == 'rename': writetime = datetime.now() fname = os.path.splitext(fname) fname = fname[0] + "." + writetime.strftime("%Y-%m-%d-%H-%M") + fname[1] else: # equivalent to 'raise' raise FileExistsError('File {} already exists'.format(fname)) with open(fname, mode='w') as fd: format_registry[fmt].write(self, fd, **write_kwargs)
[docs] def to_string(self, **kwargs): """ Returns Database as a string. This function is a wrapper for calling `to_file` with StringIO. Parameters ---------- kwargs : optional See keyword arguments for `to_file`. Returns ------- result : str """ result = StringIO() self.to_file(result, **kwargs) return result.getvalue()
def __str__(self): result = 'Elements: {0}\n'.format(sorted(self.elements)) result += 'Species: {0}\n'.format(sorted(self.species, key=lambda s: s.name)) for name, phase in sorted(self.phases.items()): result += str(phase)+'\n' result += '{0} symbols in database\n'.format(len(self.symbols)) result += '{0} parameters in database\n'.format(len(self._parameters)) return result def __eq__(self, other): if self is other: return True elif type(self) != type(other): return False elif sorted(self.__dict__.keys()) != sorted(other.__dict__.keys()): return False else: def param_sort_key(x): return x['phase_name'], x['parameter_type'], x['constituent_array'], \ x['parameter_order'], x['diffusing_species'] for key in self.__dict__.keys(): if key == '_parameters': # Special handling for TinyDB objects if len(self._parameters.all()) != len(other._parameters.all()): return False self_params = sorted(self._parameters.all(), key=param_sort_key) other_params = sorted(other._parameters.all(), key=param_sort_key) if self_params != other_params: return False elif self.__dict__[key] != other.__dict__[key]: return False return True def __ne__(self, other): return not self.__eq__(other)
[docs] def add_structure_entry(self, local_name, global_name): """ Define a relation between the system-local name of a phase and a "global" identifier. This is used to link crystallographically similar phases known by different colloquial names. Parameters ---------- local_name : string System-local name of the phase. global_name : object Abstract representation of symbol, e.g., in SymPy format. Examples -------- None yet. """ self._structure_dict[local_name] = global_name
[docs] def add_parameter(self, param_type, phase_name, #pylint: disable=R0913 constituent_array, param_order, param, ref=None, diffusing_species=None): """ Add a parameter. Parameters ---------- param_type : str Type name of the parameter, e.g., G, L, BMAGN. phase_name : str Name of the phase. constituent_array : list Configuration of the sublattices (elements and/or species). param_order : int Polynomial order of the parameter. param : object Abstract representation of the parameter, e.g., in SymPy format. ref : str, optional Reference for the parameter. diffusing_species : str, optional (If kinetic parameter) Diffusing species for this parameter. Examples -------- None yet. """ species_dict = {s.name: s for s in self.species} new_parameter = { 'phase_name': phase_name, 'constituent_array': tuple(tuple(species_dict.get(s.upper(), Species(s)) for s in xs) for xs in constituent_array), # must be hashable type 'parameter_type': param_type, 'parameter_order': param_order, 'parameter': param, 'diffusing_species': Species(diffusing_species), 'reference': ref } param_id = self._parameters.insert(new_parameter) return param_id
[docs] def add_phase(self, phase_name, model_hints, sublattices): """ Add a phase. Parameters ---------- phase_name : string System-local name of the phase. model_hints : list Structured "hints" for a Model trying to read this phase. Hints for major constituents and typedefs (Thermo-Calc) go here. sublattices : list Site ratios of sublattices. Examples -------- None yet. """ new_phase = Phase() new_phase.name = phase_name # Need to convert from ParseResults or else equality testing will break new_phase.sublattices = tuple(sublattices) new_phase.model_hints = model_hints self.phases[phase_name] = new_phase
[docs] def add_phase_constituents(self, phase_name, constituents): """ Add a phase. Parameters ---------- phase_name : string System-local name of the phase. constituents : list Possible phase constituents (elements and/or species). Examples -------- None yet. """ species_dict = {s.name: s for s in self.species} try: # Need to convert constituents from ParseResults # Otherwise equality testing will be broken self.phases[phase_name].constituents = tuple([frozenset([species_dict[s.upper()] for s in xs]) for xs in constituents]) except KeyError: print("Undefined phase "+phase_name) raise
[docs] def search(self, query): """ Search for parameters matching the specified query. Parameters ---------- query : object Structured database query in TinyDB format. Examples -------- >>>> from tinydb import where >>>> db = Database() >>>> eid = db.add_parameter(...) #TODO >>>> db.search(where('eid') == eid) """ return self._parameters.search(query)