"""
.. module:: read_mc
:synopsis: Read routines and data structure for molecular cloud (0D chemistry) |prodimo| models.
.. moduleauthor:: Ch. Rab
"""
import os
import numpy as np
from prodimopy.read import Chemistry
import prodimopy.chemistry.network as pcnet
from timeit import default_timer as timer
[docs]
class MCParameterSet:
"""
A class to hold (some) physical parameters for one model or for one time step in a dynamic model.
Can also be used to to generate input files the the molecular cloud mode of ProDiMo.
The order of the parameters is currently fixed and has to match the order in the input file for dynamic models (mc_dynamic.in).
This also means this class only contains those parameters that can be set in the input file for dynamic models.
"""
def __init__(self):
self.params: dict = {
"nH": None,
"td": None,
"tg": None,
"CHI1": None,
"CHI2": None,
"a1": None,
"dust_to_gas": None,
"rho_gr": None,
"vturb": None,
"CRI": None,
"Av": None,
"albedo": None,
}
""" :
Dictionary with the parameters. Do not change the order.
"""
@property
def nH(self):
return self.params["nH"]
@nH.setter
def nH(self, value):
self.params["nH"] = value
@property
def td(self):
return self.params["td"]
@td.setter
def td(self, value):
self.params["td"] = value
@property
def tg(self):
return self.params["tg"]
@tg.setter
def tg(self, value):
self.params["tg"] = value
@property
def CHI1(self):
return self.params["CHI1"]
@CHI1.setter
def CHI1(self, value):
self.params["CHI1"] = value
@property
def CHI2(self):
return self.params["CHI2"]
@CHI2.setter
def CHI2(self, value):
self.params["CHI2"] = value
@property
def a1(self):
return self.params["a1"]
@a1.setter
def a1(self, value):
self.params["a1"] = value
@property
def dust_to_gas(self):
return self.params["dust_to_gas"]
@dust_to_gas.setter
def dust_to_gas(self, value):
self.params["dust_to_gas"] = value
@property
def rho_gr(self):
return self.params["rho_gr"]
@rho_gr.setter
def rho_gr(self, value):
self.params["rho_gr"] = value
@property
def vturb(self):
return self.params["vturb"]
@vturb.setter
def vturb(self, value):
self.params["vturb"] = value
@property
def CRI(self):
return self.params["CRI"]
@CRI.setter
def CRI(self, value):
self.params["CRI"] = value
@property
def Av(self):
return self.params["Av"]
@Av.setter
def Av(self, value):
self.params["Av"] = value
@property
def albedo(self):
return self.params["albedo"]
@albedo.setter
def albedo(self, value):
self.params["albedo"] = value
[docs]
def fill_fromStr(self, s):
values = s.split()
for key, val in zip(self.params.keys(), values):
self.params[key] = float(val)
def __str__(self):
return " ".join(f"{value}" for value in self.params.values())
__repr__ = __str__
[docs]
class Data_mc(object):
"""
Data structure for one molecular cloud (0D chemistry) |prodimo| model.
Can be used for a time-dependent, steady-state (or final) or for a dynamic model.
The dynamic ones can have physical properties as function of f(t).
"""
def __init__(self, name: str | None = None):
"""
Parameters
----------
name : string
The name of the model.
Attributes
----------
"""
self.name = name
""" string :
The name of the model (can be empty)
"""
self._imodel: int | None = None
""" : The model number if the model is part of a series of models """
self._dynamic_model: bool = False
"""
Is it a dynamic model?
"""
self.directory: str | None = None
""" : The directory from which the model was read. Is e.g. set :meth:`~prodimopy.read_mc.read_mc`. Can be a relative path.
"""
self.species = None
""" array_like(string,ndim=1) :
an ordered list of species names.
"""
self.ages = None
""" array_like(float,ndim=1) :
the output ages of the model.
"""
self.physicalParameters: MCParameterSet | list[MCParameterSet] | None = None
"""
The physical parameters of the model. In case of a dynamic model it will be a list.
"""
self.ratecoefficients = None
""" array_like(float,ndim=1) :
The rate coefficients (just the rates as an array)
"""
self.abundances = None
""" array_like(float,ndim=2) :
the abundances for each species and each age `DIMS:` (number of ages,number of species).
"""
self._chemnet: pcnet.ReactionNetworkPout = None
@property
def chemnet(self) -> pcnet.ReactionNetworkPout:
"""The chemical reactions of the model. Holds what is in ``Reactions.out``. Is only read if needed."""
if self._chemnet is None:
self._chemnet = pcnet.ReactionNetworkPout(name=self.name, modeldir=self.directory)
return self._chemnet
@chemnet.setter
def chemnet(self, value: pcnet.ReactionNetworkPout):
self._chemnet = value
[docs]
def analyse_chemistry(
self,
species: str,
filenameChemistry: str | None = None,
to_txt: bool = True,
screenout: bool = False,
) -> list[Chemistry]:
"""
Routine to create a Chemistry analysis object for the given species, if the chemanalysis data is available.
Parameters
----------
species :
The species to analyse.
filenameChemistry :
The name of the file that holds the reaction rates. Usually one does not need to change that.
to_txt :
Write info about formation and destruction reactions for the selected molecule to a txt file.
screenout :
If ``True`` all the form./dest. reactions are printed on the screen.
Returns
-------
list[Chemistry] :
List of objects that hold all the required information and can be used for the plotting routines or for :func:`~prodimopy.read.reac_rates_ix_iz`.
"""
import astropy.io.fits as fits
start = timer()
nage = self.ages.shape[0]
if filenameChemistry is None:
filenameChemistry = "mc_chemanalysis_001.fits"
if self._imodel is not None:
filenameChemistry = f"mc_chemanalysis_{self._imodel:03d}.fits"
chemanas = list()
fidx = list() # indices of all unique formation reactions
didx = list() # indices of all unique destruction reactions
# that gives me simply all formation and destruction reactions for the given species
# those include the real ids (e.g. starts at 1) and not the zero-based python indices!!
fidx = np.array([x.id for x in self.chemnet.reactions if species in x.products])
didx = np.array([x.id for x in self.chemnet.reactions if species in x.reactants])
# now read all info from the fits file
cfits = fits.open(
os.path.join(self.directory, filenameChemistry),
do_not_scale_image_data=True,
memmap=True,
)
print("READ: Reading chemistry analysis from file: ", filenameChemistry, " ...")
# create one Chemistry object per timestep
formrates = cfits[0].data[fidx - 1, :] # is read in reversed order for the dims
destrates = cfits[0].data[didx - 1, :]
formrates = formrates.T # transpose it to have it in nage,ncreac ...
destrates = destrates.T # transpose it to have it in nage,ncreac ...
cfits.close()
if formrates.shape[0] == (nage - 1):
formrates = np.insert(formrates, 0, 0.0, axis=0)
destrates = np.insert(destrates, 0, 0.0, axis=0)
for iage in range(nage):
# stores all the formation reaction indices and rates for each grid point
# still use one grid point, to be compatible with the plotting routines
gridf = np.empty((1, 1, 2), dtype=np.ndarray)
gridd = np.empty((1, 1, 2), dtype=np.ndarray)
totfrate = np.zeros((1, 1)) # total formation rate at each point)
totdrate = np.zeros((1, 1)) # total destruction rate at each point)
# sorted indices for the formation and destruction rates
formridx = np.flip(np.argsort(formrates[iage, :])) # reversed order
destridx = np.flip(np.argsort(destrates[iage, :])) # reversed order
gridf[0, 0, 0] = fidx[formridx[:]] # formation reaction indices, sorted
gridf[0, 0, 1] = formrates[iage, formridx[:]]
gridd[0, 0, 0] = didx[destridx[:]] # destruction reaction indices, sorted
gridd[0, 0, 1] = destrates[iage, destridx[:]]
totfrate[0, 0] = np.sum(formrates[iage, :]) # total formation rate at each point
totdrate[0, 0] = np.sum(destrates[iage, :]) # total destruction rate at each point
chemistry = Chemistry(self.name)
chemistry.species = species
chemistry.gridf = gridf
chemistry.gridd = gridd
chemistry.fridxs = fidx
chemistry.dridxs = didx
chemistry.totfrate = totfrate
chemistry.totdrate = totdrate
chemistry.model = self # reference to the underlying model
if to_txt:
output_chem_fname = os.path.join(
self.directory, "chemistry_reactions_" + species + ".txt"
)
output_chem_fname = output_chem_fname.replace(".txt", "_" + str(iage) + ".txt")
f = open(output_chem_fname, "w")
f.writelines("-------------------------------------------------------\n")
f.writelines("formation and destruction reactions \n")
f.writelines("species: " + species + "\n\n")
f.writelines("Formation reactions\n")
for i, ridx in enumerate(chemistry.fridxs):
f.writelines(chemistry.reac_to_str(self.chemnet.reactions[ridx - 1], i + 1))
f.writelines("\n")
f.writelines("\n\n")
f.writelines("Destruction reactions\n")
for i, ridx in enumerate(chemistry.dridxs):
f.writelines(chemistry.reac_to_str(self.chemnet.reactions[ridx - 1], i + 1))
f.writelines("\n")
f.writelines("-------------------------------------------------------\n")
f.close()
print("INFO: Writing information to: " + output_chem_fname)
# also print it to stdout
if screenout:
with open(output_chem_fname) as f:
print(f.read())
chemanas.append(chemistry)
print("INFO: chemanalysis time: ", "{:4.2f}".format(timer() - start) + " s")
print("")
return chemanas
def __str__(self):
output = "Info MC: \n"
output += "\n Species: \n"
output += str(self.species)
output += "\n"
output += "\n Ages: \n"
output += str(self.ages)
return output
def _read_ages(filename: str) -> np.ndarray:
"""Reads a files with ages, and add a zero age at the beginning (i.e. initial abundances).
Parameters
----------
filename :
The filename of the file containing the ages.
Returns
-------
array_like(float,ndim=1) :
The ages of the model. The first age is set to zero and is not read from the file. The rest of the ages are read from the file.
"""
print("READ: Reading File: ", filename, " ...")
ages = np.loadtxt(filename)
# insert age zero initial abundance
ages = np.insert(ages, 0, 0.0)
return ages
[docs]
def read_tdep_file(filename: str) -> tuple[list[str], np.ndarray, np.ndarray]:
"""
Tries to read the MC_conc_tdep.out file and returns the ages and
and abundances of that file.
This file actually includes the number densities and not abundances.
Try to estimate the abundances by adding up H, H2 and H+
Currently this file does not include the initial abundances!
.. todo::
* This is quick and dirty. However, the problem is more in |prodimo| because
it is unclear what file is for what and when (depending on parameter configuration)
which file is written.
* This does not deal with the two e- like in the normal mode. This is inconsistent.
Returns
-------
tuple
Returns a tuple containing the list of species, the list of ages, and the
species abundances (in that order)
"""
print("READ: Reading File: ", filename, " ...")
fp = open(filename)
line = fp.readline()
species = line.strip().split()[1:] # the first column is the time
print("INFO: Found " + str(len(species)) + " species")
# Replace some species names
try:
species[species.index("HN2+")] = "N2H+"
except ValueError:
pass # should be fine for python 3
try:
species[species.index("el")] = "e-"
except ValueError:
pass # should be fine for python 3
data = np.loadtxt(filename, skiprows=1)
ages = data[:, 0]
abundances = data[:, 1:]
nH = (
abundances[0, species.index("H")]
+ 2 * abundances[0, species.index("H2")]
+ abundances[0, species.index("H+")]
+ abundances[-1, species.index("H")]
+ 2 * abundances[-1, species.index("H2")]
+ abundances[-1, species.index("H+")]
) / 2.0
abundances = abundances / nH
print(
"WARN: Used a hydrogen number density of "
+ "{:7.5e}".format(nH)
+ " to calculate the abundances. This might not be accurate enough!"
)
return species, ages, abundances
def _read_mc_dynamic_in(filename: str) -> tuple[np.ndarray, list[MCParameterSet]]:
"""Read a `mc_dynamic.in` file and return the ages and parameter settings.
File format:
Line 1 : N (number of time steps)
Line 2 : N age values in years, whitespace-separated
Lines 3+: one whitespace-separated parameter line per time step
(fields match MCParameterSet.params order)
Parameters
----------
filename :
The filename of the file containing the ages and the parameter settings.
Returns
-------
tuple
Returns a tuple containing the list of ages and a list of MCParameterSet objects (in that order).
"""
params_list = []
with open(filename) as f:
f.readline() # skip the first line with nages
ages = list(map(float, f.readline().split()))
for line in f:
line = line.strip()
if not line:
continue
p = MCParameterSet()
p.fill_fromStr(line)
params_list.append(p)
return ages, params_list
def _read_species(filename):
print("READ: Reading File: ", filename, " ...")
f = open(filename, "r")
species = [strr.strip() for strr in f.readlines()]
species[species.index("HN2+")] = "N2H+"
# quick an dirty fix for el_is_sp. If the first and last element are e-
# remove the last one
if species[0] == "e-" and species[-1] == "e-":
del species[-1]
f.close()
return species
def _read_ratecoefficients(filename):
"""
Currently returns just an array with the rate coefficients.
Does not provide information on the reaction etc.
But this is usefull for comparing models.
All rates with values <1.e-200 will be set to 1.e-200 to avoid problems
with zeros
"""
if not os.path.isfile(filename):
return None
print("READ: Reading File: ", filename, " ...")
f = open(filename, "r")
rcs = list()
for line in f:
fields = line.split()
rcs.append(max(1.0e-200, float(fields[1])))
f.close()
return np.array(rcs)
[docs]
def read_mc_final(filename="Molecular_cloud.out", directory=".", name=None):
"""
Reads the final (last timestep) molecular cloud abundances.
Parameters
----------
filename : string
The name of the file containing the abundances (optional).
directory : string
The model directory (optional).
name : string
The name of the model. Will be shown in the plots (optional).
FIXME: ist not consistent with read_mc, e.g. the species names such as N2H+ are not adapted here
FIXME: use numpy arrays for the abundances such as for time-dependent models.
"""
# if name == None:
# dirfields = directory.split("/")
# name = dirfields[len(dirfields) - 1]
mc = Data_mc(name)
f = open(directory + "/" + filename)
lines = f.readlines()
f.close()
species = list()
abun = list()
for line in lines:
fields = line.strip().split()
species.append(fields[0])
abun.append(float(fields[2]))
mc.species = species
mc.abundances = np.array(abun)
return mc
[docs]
def read_mc(
directory: str = ".",
filename: str = "mc_output.out",
agesfile: str = "mc_ages.txt",
speciesfile: str = "mc_species.txt",
imodel: int | None = None,
rcfile="MC_rate_coefficients.txt",
dynamic_input_file: str = "mc_dynamic.in",
name: str | None = None,
):
"""
Read routine for molecular cloud |prodimo| models including the ages and the species list.
Parameters
----------
directory :
The model directory.
filename:
The name of the file containing the abundances for all ages.
Please check what output file you have in your particular model and adapt
it here. (default: `mc_output.txt`)
It is rather confusing how it is done in |prodimo|. So it is hard
to find a clean solution here.
The routine also try now to read the `MC_conc_tdep.out` if there is not
`agesfile` (i.e. "mc_ages.txt"). However, in that file the initial abundances
are not included.
agesfile:
The file with the ages (default: `mc_ages.txt`)
speciesfile:
The file with the species names (default: `mc_species.txt`)
rcfile: string
The file with the calculated rate coefficients (default: `MC_rate_coefficients.txt`)
dynamic_input_file:
The name of the file that holds the input for a dynamic model.
This is needed to read the physical parameters for a dynamic model. (default: `mc_dynamic.in`).
Ignored if the file does not exist.
TODO: currently we assume mc_dynamic implies only one model, which is currently the case in prodimo.
imodel:
if imodel is set, the filenames are adapted to read the particular model (e.g. imodel=2 reads `mc_output_002.txt` instead of `mc_output.txt`)
"""
# if name == None: name=
# dirfields = directory.split("/")
# name = dirfields[len(dirfields) - 1]
if directory.startswith("~"):
directory = os.path.expanduser(directory)
# TODO: make it a general routine (e.g. also use for disk models)
# guess a name if not set
if name is None:
if directory is None or directory == "." or directory == "":
dirfields = os.getcwd().split("/")
else:
dirfields = directory.split("/")
if dirfields[-1] == "": # this is the case for path/
name = dirfields[-2]
else:
name = dirfields[-1]
mc = Data_mc(name)
# first check if it is a dynamic model
if os.path.isfile(os.path.join(directory, dynamic_input_file)):
print("INFO: Reading dynamic model ...")
mc._dynamic_model = True
if imodel is None:
imodel = 1
mc.ages, mc.physicalParameters = _read_mc_dynamic_in(
os.path.join(directory, dynamic_input_file)
)
else:
mc._dynamic_model = False
if (imodel is not None) and (imodel > 0):
mc._imodel = imodel
filename = f"mc_output_{imodel:03d}.out"
agesfile = f"mc_ages_{imodel:03d}.txt"
mc.directory = directory
mc.ratecoefficients = _read_ratecoefficients(os.path.join(directory, rcfile))
# if there is a MC_conc_tdep.out and no mc_ages.txt read the data from there
if not os.path.isfile(directory + "/" + agesfile) and os.path.isfile(
directory + "/MC_conc_tdep.out"
):
mc.species, mc.ages, mc.abundances = read_tdep_file(directory + "/MC_conc_tdep.out")
else:
mc.species = _read_species(directory + "/" + speciesfile)
mc.ages = _read_ages(directory + "/" + agesfile)
# make ages first index, species second
print("READ: Reading File: ", directory + "/" + filename, " ...")
# this is just to be backward compatible
if not os.path.isfile(directory + "/" + filename):
filename = filename.replace(".out", ".txt")
mc.abundances = np.transpose(np.loadtxt(directory + "/" + filename))
# FIXME: quick an dirty fix for el_is_sp true if the len of species is smaller
if len(mc.species) < mc.abundances.shape[1]:
mc.abundances = np.delete(mc.abundances, -1, axis=1)
return mc
[docs]
def read_umist(directory, filename="dc_molecules.dat", name=None):
"""
Reads the results of a UMIST rate13 code model.
Uses the output produced by dc.pl script provided by the UMIST code distribution.
The data is provided as a :class:`prodimopy.read_mc.Data_mc` object.
"""
if name is None:
dirfields = directory.split("/")
name = dirfields[len(dirfields) - 1]
f = open(directory + "/" + filename, "r")
lines = f.readlines()
header = lines[0].strip()
header = header[2:]
species = header.split(" ")
ages = list()
abun = list()
for i in range(1, len(lines)):
line = lines[i].strip()
fields = line.split(" ")
ages.append(float(fields[0]))
abun.append(fields[1:])
out = Data_mc(name)
out.ages = np.array(ages, dtype="|S4").astype(float)
out.ages = np.array(ages)
out.species = species
out.abundances = np.array(abun).astype(float)
return out