"""
.. module:: read
:synopsis: Reads the output data of a |prodimo| model and stores it in a data structure.
.. moduleauthor:: Ch. Rab
"""
import cProfile
from collections import OrderedDict
from collections.abc import MutableMapping
import typing
import glob
import math
import os
from timeit import default_timer as timer
import warnings
# for now use type_extensions to be compatible with python < 3.13
from typing_extensions import deprecated, TypeAlias
from astropy import constants as const
from astropy import units as u
from scipy.interpolate import interp1d
import scipy.integrate as integrate
import f90nml as nml
from tqdm import tqdm
import numpy as np
from numpy.typing import NDArray
import prodimopy.chemistry.network as pcnet
# This activates deprecation warnings, otherwise they are not seen (from e.g. numpy)
warnings.filterwarnings("default", category=DeprecationWarning)
# Type Alias for the 2D grid fields, is just for typing
TGridfloat: TypeAlias = np.ndarray[tuple[int, int], np.float64]
def _do_cprofile(func):
"""
Simple profiling
Taken from here: https://zapier.com/engineering/profiling-python-boss/
Can be used via a decorator: i.e. place '@do_cprofile' before the method you want to profile.
But please also remove it again after profiling.
"""
def profiled_func(*args, **kwargs):
profile = cProfile.Profile()
try:
profile.enable()
result = func(*args, **kwargs)
profile.disable()
return result
finally:
profile.print_stats(sort="time")
return profiled_func
[docs]
class DataStar(object):
"""
Stellar spectrum and several stellar properties.
This object holds data from `StarSpectrum.out` but also from other sources such as `ProDiMo.out`.
It also provides some convenience functions for e.g. `Lstar` or the mass accretion rate derived from the UV luminosity.
"""
def __init__(self, nlam, teff, r, logg, luv):
"""
Constructor for reading StarSpectrum.out.
Other properties are filled later, if possible.
Attributes
----------
"""
self.nlam: int = nlam
""" : number of wavelength points """
self.teff: float = teff
r""" : effective temperature *Unit*: :math:`\mathrm{K}` """
self.r: float = r
r""" : radius *Unit*: :math:`\mathrm{R_\odot}` """
self.logg: float = logg
r""" : surface gravity *Unit*: :math:`\mathrm{cm/s^2}` """
self.lam: NDArray[np.float64] = np.zeros(shape=(nlam))
r""" : Stellar spectrum wavelength *Unit*: :math:`\mathrm{\mu m}` """
self.nu: NDArray[np.float64] = np.zeros(shape=(nlam))
r""" : Stellar spectrum frequency *Unit*: :math:`\mathrm{Hz}` """
self.Inu: NDArray[np.float64] = np.zeros(shape=(nlam))
r""" : Stellar Spectrum intensity *Unit*: :math:`\mathrm{erg/cm^2/s/Hz/sr}` """
self.InuBand: NDArray[np.float64] | None = None
r""" : Stellar Spectrum band averaged (from RTinterpolation.out) *Unit*: :math:`\mathrm{erg/cm^2/s/Hz/sr}` """
self.InuInterpol: NDArray[np.float64] | None = None
r""" : Stellar Spectrum interpolated (J_NU_FROM_RT, from RTinterpolation.out) *Unit*: :math:`\mathrm{erg/cm^2/s/Hz/sr}` """
self.wlInterpol: NDArray[np.float64] | None = None
r""" : wl grid for the interpolated spectrum is in micron. """
self.mass: float | None = None
r""" : stellar mass *Unit*: :math:`\mathrm{M_\odot}` """
self.Lstar: float | None = None
r""" : stellar luminosity (from ProDiMo.out) *Unit*: :math:`\mathrm{L_\odot}`. Does not include accretion luminosity. """
self._Rsun=6.959900000e10 # from ProDiMo init_nature.f
self._Lsun=3.846000000E+33 # from ProDiMo init_nature.f
self._Msun=1.988922500E+33 # from ProDiMo init_nature.f
self._lambs: NDArray[np.float64] | None = None # just to hold the band wl, if RTinterpolation.out can be read
@property
def lumAccr(self):
r""": The accretion luminosity *Unit*: :math:`\mathrm{erg/s}`
Is the excess luminosity :math:`L_\mathrm{accr}` due to accretion, i.e. the part that is added to the pure stellar photosphere spectrum (i.e. no accretion).
We consider here only the part of the spectrum with :math:`\lambda \geq 0.0912` micron.
.. math::
L_\mathrm{accr}=L(\lambda \geq 0.0912\,\mathrm{micron}) - L_\mathrm{star}
Where :math:`L_\mathrm{star}` is the value from `Parameter.in` (read from `ProDiMo.out`),
and :math:`L` is the total luminosity for :math:`\lambda \geq 0.0912` micron.
"""
lumtot = self.calcLum(lammin=0.0912)
return lumtot - (self.Lstar * self._Lsun)
@property
def mdot(self):
r""": The mass accretion rate in :math:`\mathrm{M_\odot/yr}` derived from the UV luminosity.
Uses:
.. math::
L_\mathrm{accr} = G \times M_{*} \times \dot{M}_\mathrm{accr} / R
where we use :attr:`lumAccr` for :math:`L_\mathrm{accr}`.
"""
Rstar = self.r * self._Rsun
Laccr = self.lumAccr
Mstar = self.mass * self._Msun
Maccr = Rstar * Laccr / (const.G.cgs.value * Mstar)
return ((Maccr * u.g / u.s).to(u.Msun / u.yr)).value
@property
def lumTot(self):
""": The total stellar luminosity (full spectrum) *Unit*: erg/s"""
return self.calcLum()
@property
def lum(self):
r""": The "stellar" luminosity :math:`(\lambda \geq 0.0912 \, \mathrm{\mu m})` *Unit*: :math:`\mathrm{erg/s}`
This includes the accretion component if present.
"""
return self.calcLum(lammin=0.0912)
@property
def lumEUV(self):
""": stellar EUV luminosity *Unit*: erg/s
Defined as everything with energies between and 13.6 eV and 100 eV.
"""
lammin = ((0.0999999 * u.keV).to(u.micron, equivalencies=u.spectral())).value
lammax = 0.0911999999
return self.calcLum(lammin=lammin, lammax=lammax)
@property
def lumXray(self):
""": stellar X-ray luminosity *Unit*: erg/s
The luminosity for the part of the spectrum with energies >= 0.1 keV.
"""
lammax = ((0.1 * u.keV).to(u.micron, equivalencies=u.spectral())).value
return self.calcLum(lammax=lammax)
[docs]
def calcLum(self, lammin: float | None = None, lammax: float | None = None) -> float:
"""
Integrates the stellar spectrum from `lammin` to `lammax` and returns the luminosity.
Parameters
----------
lammin :
minimum wavelength in micron. Default: `None` (i.e. first wavelength point)
lammax :
maximum wavelength in micron. Default: `None` (i.e. last wavelength point)
Returns
-------
float
The luminosity from `lammin` to `lammax` in `erg/s`
"""
# Set default wavelength limits
if lammin is None:
lammin = self.lam[0]
if lammax is None:
lammax = self.lam[-1]
# Create mask for wavelength range
mask = (self.lam >= lammin) & (self.lam <= lammax)
# Integrate Inu over frequency
# Inu is in erg/cm²/s/Hz/sr, need to integrate over frequency
# additional pi for because it is /pi
# *-1.0 because nu is in descending order
lum_erg_per_s = (
integrate.trapezoid(self.Inu[mask], x=self.nu[mask])
* -1.0
* 4.0
* math.pi**2
* (self.r * self._Rsun) ** 2
) # surface area
return lum_erg_per_s
def __str__(self):
return (
f"Mstar: {self.mass:g} Msun"
+ "\n"
+ f"Rstar: {self.r:g} Rsun"
+ "\n"
+ f"Teff: {self.teff:g} K"
+ "\n"
+ f"Lstar: {self.Lstar:g} Lsun"
+ "\n"
+ f"L: {self.lum / self._Lsun:g} Lsun"
+ "\n"
+ f"Ltot: {self.lumTot / self._Lsun:g} Lsun"
+ "\n"
+ f"Laccr: {self.lumAccr / self._Lsun:g} Lsun"
+ "\n"
+ f"LEUV: {self.lumEUV:g} erg/s"
+ "\n"
+ f"LXray: {self.lumXray:g} erg/s"
+ "\n"
+ f"Mdot: {self.mdot:g} Msun/yr"
)
[docs]
class DataContinuumObs(object):
"""
Holds the observational data for the continuum (the dust).
Holds the photometric data, spectra (experimental) and radial profiles
(experimental).
.. todo::
- proper documentation
"""
def __init__(self, nlam=None):
if nlam is not None:
self.nlam = nlam
self.lam = np.zeros(shape=(nlam))
self.nu = np.zeros(shape=(nlam))
self.fnuErg = np.zeros(shape=(nlam))
self.fnuJy = np.zeros(shape=(nlam))
self.fnuErgErr = np.zeros(shape=(nlam))
self.fnuJyErr = np.zeros(shape=(nlam))
self.flag = np.empty(nlam, dtype="U2")
else:
self.nlam = None
self.lam = None
self.nu = None
self.fnuErg = None
self.fnuJy = None
self.fnuErgErr = None
self.fnuJyErr = None
self.flag = None
self.specs = None
""" holds a list of spectra if available (wl,flux,error) """
self.R_V = None
self.E_BV = None
self.A_V = None
self.radprofiles = None
[docs]
class DataContinuumImages(object):
"""
Holds the continuum images (image.out) and provides a method to read
one particular image.
The coordinates x,y are the same for all images, and only stored once.
.. todo::
- find a better solution for the multiple inclinations (Problem: don't want to read through the whole file).
"""
def __init__(self, nlam, nx, ny, incl=None, filepath="./image.out"):
"""
Attributes
----------
"""
self.nlam: int = nlam
""" : The number of wavelength points (== number of images) """
self.lams: NDArray[np.float64] = np.zeros(shape=(nlam))
""" : The wavelengths. *UNIT:* micron, *DIMS:* (nlam) """
self.nx: int = nx
""" : The number of x axis points (radial) of the image """
self.ny: int = ny
""" : The number of y axis points (or theta) of the image """
self.x: NDArray[np.float64] = np.zeros(shape=(nx, ny))
""" : x coordinates. *UNIT:* au, *DIMS:* (nx,ny) """
self.y: NDArray[np.float64] = np.zeros(shape=(nx, ny))
""" : y coordinates. *UNIT:* au, *DIMS:* (nx,ny) """
self._filepath = filepath
self._inclinations = list()
self._incidx = 0 # which inclination to use
self._x=list()
self._y=list()
@property
def inclination(self):
return self._inclinations[self._incidx]
def __str__(self):
output = "ny: " + str(self.nx) + " nx: " + str(self.ny) + " nlam: " + str(self.nlam)
output += " incl: " + str(self.inclination)
return output
[docs]
def getImage(self, wl: float, iinc: int = 0) -> tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64],float]:
"""
Reads the intensities at a certain wavelength (image) from the image.out
file.
The image with the closest wavelength to the given wavelength (wl) will be
returned.
Parameters
----------
wl : float
the wavelength in micron of the requested image
iinc : int
the inclination index in case of multiple inclinations. Default ``iinc=0``
Returns
-------
tuple :
x, y coordinates DIM(nx,ny), intensities DIM(nx,ny), UNIT: erg/cm^2/s/Hz/sr, and the exact wavelength Unit: micron
"""
idx = np.argmin(np.abs(self.lams - wl))
# jump to the inclination requested, but check first
if iinc < 0 or iinc >= len(self._inclinations):
raise ValueError("iinc must be between 0 and " + str(len(self._inclinations) - 1))
# also set the current inclination
self._incidx=iinc
skiprows = 6 * (iinc + 1) + (self.nx * self.ny) * iinc
# first time need to init _x,_y
if len(self._x)==0 or len(self._y)==0:
self._x=[None]*len(self._inclinations)
self._y=[None]*len(self._inclinations)
# read the coordinates only once
# ix,iz, x, y ... not required here
if self._x[iinc] is None or self._y[iinc] is None:
xy = np.loadtxt(self._filepath, skiprows=skiprows, usecols=(2, 3), max_rows=self.nx * self.ny)
x = xy[:, 0].reshape(self.nx, self.ny)
y = xy[:, 1].reshape(self.nx, self.ny)
self._x[iinc]=x
self._y[iinc]=y
else:
# FIXME: no so great, but avoids reading the file again
x=self._x[iinc]
y=self._y[iinc]
intens = np.loadtxt(
self._filepath, skiprows=skiprows, usecols=4 + int(idx), max_rows=self.nx * self.ny
)
return x,y,intens.reshape((self.nx, self.ny)), self.lams[idx]
[docs]
class DataSEDAna(object):
"""
Holds the analysis data for the Spectral Energy Distribution (SEDana.out).
"""
def __init__(self, nlam: int, nx: int):
"""
Initializes the SED analysis data.
Parameters
----------
nlam :
The number of wavelength points.
nx :
The number of radial points.
Attributes
----------
"""
self.lams = np.zeros(shape=(nlam))
""" Wavelength points `shape=(nlam)` """
self.r15 = np.zeros(shape=(nlam))
""" The r15 radius (when 15% of the emission are reached. `shape=(nlam)` """
self.r85 = np.zeros(shape=(nlam))
""" The r85 radius (when 85% of the emission are reached. `shape=(nlam)` """
self.z15 = np.zeros(shape=(nlam, nx))
""" The z15 height for each radial grid point (when 15% of the emission are reached from the top). `shape=(nlam,nx)` """
self.z85 = np.zeros(shape=(nlam, nx))
""" The z85 height for each radial grid point (when 85% of the emission are reached from the top). `shape=(nlam,nx)` """
[docs]
class DataSED(object):
"""
Holds the data for the Spectral Energy Distribution (SED).
In case of multiple inclinations the data for all inclinations is stored.
One can select a SED for a given inclination via :func:`~prodimopy.Data_ProDiMo.sedinc`
"""
def __init__(self, nlam: int, distance: float):
"""
Parameters
----------
nlam :
The number of wavelength points.
distance :
The distance of the target in pc.
Attributes
----------
"""
self.nlam: int = nlam
""" : number of wavelength points """
self.distance: float = distance
""" : distance for which the SED was calculated. UNIT: pc """
self.lam: NDArray[np.float64] = np.zeros(shape=(nlam))
""" : the wavelength points in micron DIM: nlam """
self.nu: NDArray[np.float64] = np.zeros(shape=(nlam))
""" : the frequency points in Hz DIM: nlam """
self.sedAna: DataSEDAna | None = None
""" : the SED analysis data. Holds the data for the emission origin """
self.fnuFaceonErg: NDArray[np.float64] = np.zeros(shape=(nlam))
""" : the flux density in erg/cm^2/Hz/s of the face-on SED (i.e. not from raytracing) """
# the quantities that hold the data for all inclinations
self._fnuErgs: list[np.float64] = list()
self._nuFnuWs: list[np.float64] = list()
self._fnuJys: list[np.float64] = list()
self._inclinations: list[np.float64] = list()
self._Lbols: list[np.float64] = list()
self._Tbols: list[np.float64] = list()
self._incidx: int = 0
def __str__(self):
text = str(self.nlam) + "/" + str(self.inclination) + "/" + str(self.distance) + "/" + "\n"
text = text + str(self._inclinations)
return text
@property
def fnuErg(self) -> np.float64:
""": The flux density `F_nu` in erg/s/cm^2/Hz, without extinction."""
return self._fnuErgs[self._incidx]
@property
def nuFnuW(self) -> np.float64:
""": The flux density `nuF_nu` in W/m^2; without extinction."""
return self._nuFnuWs[self._incidx]
@property
def fnuJy(self) -> np.float64:
""": The flux density in Jy; without extinction."""
return self._fnuJys[self._incidx]
@property
def inclination(self) -> np.float64:
""": The current inclination. See :func:`prodimopy.read.sedinc`."""
return self._inclinations[self._incidx]
@property
def Lbol(self) -> np.float64:
""": The bolometric luminosity in `L_sun`. See :func:`setLbolTbol`."""
return self._Lbols[self._incidx]
@property
def Tbol(self) -> np.float64:
""": The bolometric temperature in K. See :func:`setLbolTbol`."""
return self._Tbols[self._incidx]
[docs]
def setLbolTbol(self):
"""
Calculates the bolometric temperature and luminosity for the given
values of the SED. Quite approximate at the moment.
.. todo::
- check the correctness of the calculation
- provide a proper reference for the procedure we follow.
- allow for more flexibility
"""
# FIXME: correctness needs to be verified
# calculate the bolometric luminosity
# *-1.0 because of the ordering of nu
# calculate Tbol see Dunham 2008 Equation 6
# check what is here the proper value
# in particular Tbol is very sensitive to this value
# Lbol does not seem to change much (e.g. if 0.1 is used instead)
# for the moment exclude the shortest wavelength as these is most likely scattering
mask = self.lam > 0.2
self._Lbols[self._incidx] = integrate.trapezoid(self.fnuErg[mask], x=self.nu[mask]) * -1.0
self._Lbols[self._incidx] = (
self.Lbol * 4.0 * math.pi * (((self.distance * u.pc).to(u.cm)).value) ** 2
)
self._Lbols[self._incidx] = self.Lbol / (const.L_sun.cgs).value
self._Tbols[self._incidx] = (
1.25e-11
* integrate.trapezoid((self.nu[mask] * self.fnuErg[mask]), x=self.nu[mask])
/ integrate.trapezoid(self.fnuErg[mask], x=self.nu[mask])
)
[docs]
def extinction(self, sedObs: DataContinuumObs, extmodel: str = "F99") -> NDArray[np.float64]:
"""
Return the extinction for the given model.
.. note::
Please cite https://joss.theoj.org/papers/10.21105/joss.07023
if you use this function in your work.
Parameters
----------
sedObs :
The observed data for the continuum. Needs the R_V and E_BV values.
extmodel :
The model to use for the extinction. Default: "F99"
For supported models see `astropy dust_extinction <https://dust-extinction.readthedocs.io/en/stable/dust_extinction/choose_model.html#parameter-dependent-average-curves>`_
Returns
-------
: The extinction values for the given model. `shape=(nlam)` E.g. do `flux*extinction(model.sedObs)`.
"""
import dust_extinction.parameter_averages as extmodels
if sedObs is None:
raise ValueError("No sedObs data for the extinction available.")
tmpm = getattr(extmodels, extmodel.upper(), None)
if tmpm is None:
raise ValueError("Extinction model not supported: " + extmodel.upper())
print("INFO: Applying extinction model:", extmodel)
extm = tmpm(sedObs.R_V)
ext = np.ones(self.lam.shape)
ist = np.argmin(np.abs(self.lam - 1.0 / extm.x_range[1]))
ien = np.argmin(np.abs(self.lam - 1.0 / extm.x_range[0]))
ext[ist:ien] = extm.extinguish(self.lam[ist:ien] * u.micron, Ebv=sedObs.E_BV)
return ext
[docs]
class DataLineEstimateRInfo(object):
"""
Data container for the extra radial information for a single line estimate.
This object corresponds to one line of the radial information in `FlineEstimates.out`,
i.e. it represents the vertical column at grid point `ix`.
"""
def __init__(self, ix, iz, Fcolumn, tauLine, tauDust, z15, z85, ztauD1, Ncol):
"""
Attributes
----------
"""
self.ix: int = ix
""" : The x-coordinate index. """
self.iz: int = iz
""" : The z-coordinate index. """
self.Fcolumn: float = Fcolumn
""" : the line flux for the column (check again, i guess the iz coordinate is irrelevant for that); *UNIT:* W/m^2 """
self.tauLine: float = tauLine
""" : the total vertical optical depth of the line measured from tauDust=1 upwards f(r) """
self.tauDust: float = tauDust
""" : the total vertical optical depth of the dust at the wl of the line as f(r) """
self.z15: float = z15
""" : z-level where the line flux reaches 15% of the total flux as f(r) (integrated from top to bottom of the disk); *UNIT:* au """
self.z85: float = z85
""" : z-level where the line flux reaches 85% as f(r) (integrated from top to bottom of the disk); *UNIT:* au """
self.ztauD1: float = ztauD1
""" : z-level where `taudust_ver(lambda_line)=1`; *UNIT:* au
Is `None` for FlineEstimates version < 3
"""
self.Ncol: float = Ncol
""" : The column density of the species as f(r) measured from `ztauD1` upwards; UNIT: |cm^-2|
It considers both halves of the disks (i.e. for the case of tauDust <1 ) so the values can be different to
:attr:`~prodimopy.read.Data_ProDiMo.cdnmol` where only one half of the disk is considered.
Is `None` for FlineEstimates version < 3
"""
[docs]
class DataLineEstimate(object):
"""
Data container for a single line estimate.
This includes also the radial information (e.g. line emitting region).
The radial information is only read and put into memory when accessed.
"""
def __init__(self, ident: str, wl: float, jup: int, jlow: int, flux: float):
"""
Attributes
----------
"""
self.ident: str = ident
""" The line identification (species name) """
self.wl: float = wl
""" The wavelength of the line in ``micron`` """
self.jup: int = jup
""" Upper level as defined in |prodimo| (not necessarily the real one!) """
self.jlow: int = jlow
""" Lower level as defined in |prodimo| (not necessarily the real one!). """
self.flux: float = flux
""" The integrated line flux in ``W/m^2`` """
self._rInfo: list[DataLineEstimateRInfo] | None = None
""" The extra radial information (i.e. info about the line emitting region). """
self._posrInfo = None
""" stores the position of the radial information to access it later on """
self._model: Data_ProDiMo # a reference to the model, needed for a couple of things,
@property
def frequency(self) -> float:
""": Frequency of the line in ``GHz``."""
return (self.wl * u.micrometer).to(u.GHz, equivalencies=u.spectral()).value
@property
def flux_Jy(self) -> float:
""": The flux of the line in [Jansky km |s^-1|]."""
return _flux_Wm2toJykms(self.flux, self.frequency)
@property
def rInfo(self) -> list[DataLineEstimateRInfo]:
"""
The radial information for the line estimate (e.g. used for line origin plots).
It used lazy initialisation, e.g. if the rInfo object is accessed the firs time the
data is read from `FlineEstimates.out`.
Returns
-------
list[DataLineEstimateRInfo]
A list with ``nx`` entries.
"""
if self._rInfo is None:
self._read_lineEstimateRinfo()
return self._rInfo
@property
def rInfo_Fcolumn(self) -> np.ndarray:
""": The line flux per column as a numpy array, one entry per radial grid point (see :attr:`~prodimopy.read.DataLineEstimateRInfo.Fcolumn`). DIM: (nx) """
return np.array([r.Fcolumn for r in self.rInfo])
@property
def rInfo_tauLine(self) -> np.ndarray:
""": The total vertical optical depth of the line per radial grid point as a numpy array (see :attr:`~prodimopy.read.DataLineEstimateRInfo.tauLine`). DIM: (nx) """
return np.array([r.tauLine for r in self.rInfo])
@property
def rInfo_tauDust(self) -> np.ndarray:
""": The total vertical optical depth of the dust at the line wavelength per radial grid point as a numpy array (see :attr:`~prodimopy.read.DataLineEstimateRInfo.tauDust`). DIM: (nx) """
return np.array([r.tauDust for r in self.rInfo])
@property
def rInfo_Ncol(self) -> np.ndarray:
""": The column density of the species per radial grid point as a numpy array (see :attr:`~prodimopy.read.DataLineEstimateRInfo.Ncol`). DIM: (nx) """
return np.array([r.Ncol for r in self.rInfo])
def _read_lineEstimateRinfo(self):
"""
Reads the additional Rinfo data for the given lineEstimate.
"""
f, dummy = _getfile(self._model._fpFlineEstimates, None, binary=True, suppressMsg=True)
if f is None:
return None
f.seek(self._posrInfo, 0)
nxrange = list(range(self._model.nx))
self._rInfo = list()
for j in nxrange:
fieldsR = f.readline().decode().split()
ix = int(fieldsR[0].strip())
iz = int(fieldsR[1].strip())
Fcolumn = float(fieldsR[2])
tauLine = _convToFloatCheck(fieldsR[3], ix, iz, "tauLine")
tauDust = _convToFloatCheck(fieldsR[4], ix, iz, "tauDust")
z15 = (float(fieldsR[5]) * u.cm).to(u.au).value
z85 = (float(fieldsR[6]) * u.cm).to(u.au).value
ztauD1 = None
Ncol = None
if self._model._versionFlineEstimates == 3:
ztauD1 = _convToFloatCheck(fieldsR[7], ix, iz, "ztauD1")
Ncol = float(fieldsR[8])
# ix -1 and iz-1 because in python we start at 0
rInfo = DataLineEstimateRInfo(
ix - 1, iz - 1, Fcolumn, tauLine, tauDust, z15, z85, ztauD1, Ncol
)
self._rInfo.append(rInfo)
f.close()
[docs]
def luminosityErgs(self, distance: float) -> float:
"""
Returns the luminosity in erg/s for the given distance
simply calls :func:`~prodimopy.read.DataLineEstimate.luminosityWatt` and converts the result to erg/s.
"""
return ((self.luminosityWatt(distance) * u.Watt).to(u.erg / u.s)).value
[docs]
def luminosityWatt(self, distance: float) -> float:
"""
Parameters
----------
distance : float
The distance in pc, to calculate the line luminosity.
Returns
-------
float
Returns the line luminosity in Watt
"""
return (4.0 * math.pi * (distance * u.pc).to(u.m) ** 2 * self.flux * u.Watt / u.m**2).value
def __str__(self):
return f"LineEst: {self.ident:16} wl={self.wl:13.6f} μm flux={self.flux:10.3e} W/m^2 (up={self.jup:5} low={self.jlow:5})"
[docs]
class DataLineProfile:
"""
Data container for a spectral line profile for a single spectral line.
Attributes
----------
"""
def __init__(self, nvelo, restfrequency=None):
self.nvelo: int = nvelo
""" : number of velocity points of the profile. """
self.restfrequency: float = restfrequency
""" : The rest frequency of the line. Useful for conversions. Optional. *UNIT:* GHz """
self.velo: NDArray[np.float64] = np.zeros(shape=(self.nvelo))
""" : The velocity grid of the line profile. *UNIT:* kms/s, *DIMS:* (nvelo) """
self._flux = np.zeros(shape=(self.nvelo))
self._flux_dust = np.zeros(shape=(self.nvelo))
self._flux_conv = np.zeros(shape=(self.nvelo))
self._flux_unit = "Jy"
def __flux_unitconv(self, flux):
"""Internal function to convert the flux to the correct unit."""
if self.flux_unit == "ErgAng":
return (
(flux * u.Jy)
.to(
u.erg / u.s / u.cm**2 / u.angstrom,
equivalencies=u.spectral_density(self.restfrequency * u.GHz),
)
.value
)
elif self.flux_unit == "mJy":
return flux * 1000.0
else:
return flux
@property
def flux(self) -> NDArray[np.float64]:
""": The flux at each velocity point. *UNIT:* given by :attr:`flux_unit`, *DIMS:* (nvelo)"""
return self.__flux_unitconv(self._flux)
@flux.setter
def flux(self, val):
self._flux = val
@property
def flux_dust(self) -> NDArray[np.float64]:
""": The flux of the dust (continuum) at each velocity point. *UNIT:* given by :attr:`flux_unit`, DIMS: (nvelo)"""
return self.__flux_unitconv(self._flux_dust)
@flux_dust.setter
def flux_dust(self, val):
self._flux_dust = val
@property
def flux_conv(self) -> NDArray[np.float64]:
""": The flux at each velocity point for the convolved spectrum. UNIT: given by :attr:`flux_unit`, DIMS: (nvelo)"""
return self.__flux_unitconv(self._flux_conv)
@flux_conv.setter
def flux_conv(self, val):
self._flux_conv = val
@property
def frequency(self) -> float:
""": The frequencies according to the velocity grid. Is relative to the rest frequency. UNIT: GHz
FIXME: restfreq can be None ...
"""
return (
(self.velo * u.km / u.s)
.to(u.GHz, equivalencies=u.doppler_optical(self.restfrequency * u.GHz))
.value
)
@property
@deprecated("Use flux_unit instead")
def fluxErgAng(self):
"""
The fluxes in units of erg/s/cm^2/Angstrom.
.. deprecated:: 2.4.0
Use :attr:`flux_unit` instead.
"""
return (
(self._flux * u.Jy)
.to(
u.erg / u.s / u.cm**2 / u.angstrom,
equivalencies=u.spectral_density(self.restfrequency * u.GHz),
)
.value
)
@property
def flux_unit(self) -> str:
""": The flux unit for the line profile (e.g. for plotting).
Possible choices: ``Jy`` (default), ``mJy`` and ``ErgAng``.
"""
return self._flux_unit
@flux_unit.setter
def flux_unit(self, val: str):
if val != "Jy" and val != "mJy" and val != "ErgAng":
raise ValueError(f"flux unit {val} not supported")
self._flux_unit = val
[docs]
def convolve(self, R):
"""
Convolves the given profile to the spectral resolution R.
The profile is convolved with a Gaussian where R determines the FWHM of that
Gaussian. The results are stored in :attr:`flux_conv`
Parameters
----------
R :
The spectral resolution.
Returns
-------
float :
The FWHM of the Gaussian in units of `km/s` i.e. the spectral resolution.
"""
facFWHM = 2.355 # for getting the FWHM of a gaussian (i.e. stddev to FWHM
if R is None or R <= 0.0:
self._flux_conv = self._flux
return
delta_v = (const.c / R).to(u.km / u.s).value
delta_v = delta_v / facFWHM # need stdv for the gaussian but R is interpreted as the FWHM
gaussian = np.exp(-((self.velo) ** 2.0) / (2.0 * delta_v**2.0))
# FIXME: Does not work if the continuum is not removed
# also the 0 might not be the continuum
flux = self._flux[:] - self._flux[0]
norm = integrate.trapezoid(flux, self.velo)
flux_convolved = np.convolve(flux, gaussian, "same")
flux_convolved *= norm / integrate.trapezoid(flux_convolved, self.velo)
self._flux_conv = flux_convolved + self._flux[0]
return delta_v * facFWHM
def __str__(self):
return "nvelo: " + str(self.nvelo)
[docs]
class DataLine(object):
"""
Data container for a single spectral line read from `line_flux.out`.
All lines can then be accessed via `model.lines`, where model is a :class:`~prodimopy.read.Data_ProDiMo` object.
To print a decent looking list of all lines (in the notebook or console) one can use the following code:
.. code-block:: python
print(*model.lines,sep="\\n")
This works also with a list of lines selected via :func:`~prodimopy.read.Data_ProDiMo.selectLines`:.
But of course a simple for loop will also do the trick.
Attributes
----------
"""
def __init__(self):
self.wl: float = 0.0
""" : Rest-wavelength of the line. UNIT: `micron` """
self.frequency: float = 0.0
""" : Rest-frequency of the line. UNIT: `GHz` """
self.prodimoInf: str = ""
""" : Some information String from |prodimo| (i.e. transition) """
self.species: str = ""
""" : The chemical species of the line. Derived from ident. """
self.ident: str = ""
""" : The identification of the line (i.e. as given in `LineTransferList.in`) """
self.distance: float | None = None
""" : The distance used for the line calculations. UNIT: pc """
self._fluxs: list[float] = list()
""" : The integrated line flux. UNIT: W/m^2 """
self._fconts: list[float] = list()
""" : The continuum flux at the line position. UNIT: Jy """
self._profiles: list[DataLineProfile] = list()
""" : The line profile(s) for this particular line for each inclination. """
self._inclinations: list[float] = list()
""" : The list of inclinations to use for the line calculations. UNIT: deg """
self._incidx: int = 0
""" : The current inclination index to use. """
@property
def flux(self) -> float:
""": The integrated line flux for the current inclination. UNIT: W/m^2"""
return self._fluxs[self._incidx]
@property
def fcont(self) -> float:
""": The continuum flux at the line position for the current inclination. UNIT: Jy"""
return self._fconts[self._incidx]
@property
def profile(self) -> list[DataLineProfile]:
""": The line profile(s) for this particular line for each inclination."""
return self._profiles[self._incidx]
@property
def inclination(self) -> float:
""": The current inclination. Unit: degrees."""
return self._inclinations[self._incidx]
@property
def flux_Jy(self) -> float:
""": The flux of the line in Jansky km |s^-1|."""
return _flux_Wm2toJykms(self.flux, self.frequency)
@property
def fcontErgAng(self) -> float:
""": The flux of the continuum in erg/s/cm^2/Angstrom"""
return (
(self.fcont * u.Jy)
.to(
u.erg / u.s / u.cm**2 / u.angstrom,
equivalencies=u.spectral_density(self.frequency * u.GHz),
)
.value
)
@property
def luminosityErgs(self):
""": Returns the luminosity in erg/s"""
return ((self.luminosityWatt * u.Watt).to(u.erg / u.s)).value
@property
def luminosityWatt(self):
""": Returns the line luminosity in Watt"""
return (
4.0 * math.pi * (self.distance * u.pc).to(u.m) ** 2 * self.flux * u.Watt / u.m**2
).value
@property
def FWHM(self) -> float:
"""
Calculate the Full Width at Half Maximum (FWHM) of a flux profile.
This method computes the FWHM by finding the velocity range where the flux
is above half of the maximum value. The calculation searches for the leftmost
and rightmost points where the flux crosses the half-maximum threshold.
Parameters
----------
convolved : bool
If True, the FWHM is calculated from the convolved flux profile.
Returns
-------
float
The FWHM in km/s, calculated as the difference between the
right and left velocity values at half maximum (vright - vleft).
Notes
-----
- The half maximum is calculated as the average of the maximum and minimum
flux values: 0.5 * (max(flux) + min(flux)) , so the continuum is taken care off
- The left boundary search starts from index 0 and stops when flux exceeds
half maximum or reaches the end of the array
- The right boundary search starts from the last index and stops when flux
exceeds half maximum, or reaches index 0
- The method assumes that vright > vleft in the velocity profile
"""
return self._FWHM(convolved=False)
@property
def FWHM_convolved(self) -> float:
"""
Calculate the Full Width at Half Maximum (FWHM) of a convolved flux profile.
For details see :func:`FWHM`
"""
return self._FWHM(convolved=True)
def _FWHM(self, convolved: bool = False) -> float:
"""
Just a helper routine, so that one can still use the property decorator
"""
if convolved:
flux_profile = self.profile.flux_conv
else:
flux_profile = self.profile.flux # -self.profile.flux[0] # do continuum subtraction
velo_profile = self.profile.velo
half_max = 0.5 * (np.max(flux_profile) + np.min(flux_profile))
# search from the left side, but do not assume that the half max actually is on the left side.
ileft = 0
while flux_profile[ileft] <= half_max and ileft < len(flux_profile) - 1:
ileft += 1
# now from the right side
iright = len(velo_profile) - 1
while flux_profile[iright] <= half_max and iright > 0:
iright -= 1
return velo_profile[iright] - velo_profile[ileft]
[docs]
def convolve(self, R: float) -> float:
"""
Convolves all the profiles (each inclination) for this line.
simply calls :func:`DataLineProfile.convolve`
Parameters
----------
R :
The desired spectral resolution.
Returns
-------
float :
The FWHM of the Gaussian in units of `km/s` i.e. the spectral resolution.
"""
FWHM = None
for i in range(len(self._inclinations)):
FWHM = self._profiles[i].convolve(R)
return FWHM
def __str__(self):
return f"Line: {self.ident:16} wl={self.wl:13.6f} μm flux={self.flux:10.3e} W/m^2 inc={self.inclination:4.1f}°"
[docs]
class ModelParameters(MutableMapping):
"""
Access to the Parameters of a model. Reads Parameter.out and provides
some utility functions fo access Parameters.
"""
def __init__(self, filename="Parameter.out", directory="."):
"""
Opens the file and fills a dictionary with all Parameters.
The f90nml package is used, so type conversion is properly done.
This dictionary is readonly.
The get routine has some special treatment for certain parameters (e.g. return a list
with the correct len). However, special treatment for some fields might be missing.
Parameters
----------
filename : string
The Filename of the Parameter file. Default: Parameter.out
directory : string
The directory that contains the |prodimo| model output. Default: "."
..todo::
- make some utility routine, mainly unit conversion, for fields like Mdisk etc.
"""
fparams, fpath = _getfile(filename, directory=directory)
self.mapping: nml.NameList = nml.read(fparams)["para"]
def __getitem__(self, key):
# some special treatments ... because we do not know the length of that array
if key.upper() == "AGE_DISK":
if self.mapping["time_dependent"] or self.mapping["time_chem_disk"]:
nage = self.mapping["N_AGE"]
return self.mapping[key][0:nage]
else:
return np.inf
elif key.upper() == "INCL":
if "nincl" in self.mapping.keys():
nincl = self.mapping["nincl"]
return self.mapping[key][0:nincl]
else:
return self.mapping[key]
else:
return self.mapping[key]
def __delitem__(self, key):
print("Don't delete Parameters ...")
def __setitem__(self, key, value):
print("Updating ModelParameters is not allowed ...")
def __iter__(self):
return iter(self.mapping)
def __len__(self):
return len(self.mapping)
def __str__(self):
out = ""
for key in self.mapping:
out += key + " = " + str(self.mapping[key]) + "\n"
return out
[docs]
class DataFLiTsSpec(object):
"""
Data container for a FLiTs spectrum.
Currently provides only the wavelength grid and the flux in Jy, as produced
by FLiTs.
"""
def __init__(self):
self.wl: np.ndarray[tuple[int], np.float64]
""" Wavelength grid in micron. """
self.flux: np.ndarray[tuple[int], np.float64]
""" Flux in Jy. """
self.flux_cont: np.ndarray[tuple[int], np.float64]
""" Continuum flux of the spectrum in Jy. """
self.conv_wl: np.ndarray[tuple[int], np.float64] | None = None
""" Convolved Wavelength grid in micron. """
self.conv_flux: np.ndarray[tuple[int], np.float64] | None = None
""" Convolved Flux in Jy. """
self.conv_flux_cont: np.ndarray[tuple[int], np.float64] | None = None
""" Convolved Continuum flux of the spectrum in Jy. """
self.conv_R: NDArray[np.float64] | None = None
""" : Convolved spectrum. Unit: Jy."""
[docs]
def convolve(self, specR, sample=1, contReturn=False, inplace=False):
"""
Returns a convolved version of the Spectrum.
Does not change the original FLiTs spectrum.
Parameters
----------
specR : int
The desired spectral resolution.
Returns
-------
(tuple):
tuple containing: wls(array_like): array of wavelength points in micron,
flux(array_like): flux values for each wavelength point in Jansky.
.. todo::
allow for different units.
"""
print("INFO: convolve FLiTs spectrum ... ")
from astropy.convolution import convolve_fft
from astropy.convolution import Gaussian1DKernel
wl = self.wl
flux = self.flux
flux_cont = self.flux_cont
# Make a new wl grid
wl_log = np.logspace(
np.log10(np.nanmin(wl)), np.log10(np.nanmax(wl)), num=np.size(wl) * sample
)
# Find stddev of Gaussian kernel for smoothing
# taken from here https://github.com/spacetelescope/pysynphot/issues/78
R_grid = (wl_log[1:-1] + wl_log[0:-2]) / (wl_log[1:-1] - wl_log[0:-2]) / 2
sigma = np.median(R_grid) / specR
if sigma < 1:
sigma = 1
# Interpolate on logarithmic grid
f_log = np.interp(wl_log, wl, flux)
f_cont_log = np.interp(wl_log, wl, flux_cont)
# in the idl script this is interpreted as the FWHM,
# but the convolution routine wants stddev use relation
# FWHM=2*sqrt(2ln2)*stddev=2.355/stddev
# this should than be consistent with the result from the
# ProDiMo idl script
gauss = Gaussian1DKernel(stddev=sigma / 2.355)
flux_conv = convolve_fft(f_log, gauss)
flux_cont_conv = convolve_fft(f_cont_log, gauss)
# Interpolate back on original wavelength grid
flux_sm = np.interp(wl, wl_log, flux_conv)
flux_cont_sm = np.interp(wl, wl_log, flux_cont_conv)
cut = 2 * int(sigma)
flux_smc = flux_sm[cut : (len(flux_sm) - cut)]
flux_cont_smc = flux_cont_sm[cut : (len(flux_cont_sm) - cut)]
wlc = wl[cut : (len(wl) - cut)]
self.conv_R = specR
self.conv_wl = wlc
self.conv_flux = flux_smc
self.conv_flux_cont = flux_cont_smc
if not inplace:
if contReturn:
return wlc, flux_smc, flux_cont_smc
else:
return wlc, flux_smc
[docs]
class DataGas(object):
"""
Holds the data for the gas (mainly from `dust_opac.out`).
.. todo::
- currently only the init cs is read, not the x,y data
"""
def __init__(self, nlam):
self.nlam = nlam
self.lam = np.zeros(shape=(nlam))
self.energy = np.zeros(
shape=(nlam)
) # for convenience as often energy is also used for gas [eV]
self.abs_cs = np.zeros(shape=(nlam)) # unit cm^2/H
self.sca_cs = np.zeros(shape=(nlam)) # unit cm^2/H
[docs]
class DataDust(object):
"""
Holds the data for the dust (mainly from dust_opac.out)
.. todo::
- dust composition is not yet read.
Attributes
----------
"""
def __init__(self, amin, amax, apow, nsize, nlam):
self.amin: float = amin
""" : The minimum grain size of the size distribution. UNIT: micron """
self.amax: float = amax
""" : The maximum grain size of the size distribution. UNIT: micron """
self.apow: float = apow
""" : The power-law exponent of the grain size distribution. """
self.nsize: int = nsize
""" : The number for grain size bins """
self.lam: NDArray[np.float64] = np.zeros(shape=(nlam))
""" : The wavelength grid for the dust opacities. UNIT: `micron` DIM: (nlam) """
self.energy: NDArray[np.float64] = np.zeros(shape=(nlam))
""" : The energy grid for the dust opacities (for convenience). UNIT: eV DIM: (nlam) """
self.kext: NDArray[np.float64] = np.zeros(shape=(nlam))
""" : The dust extinction coefficient for each wavelength per g of dust. UNIT: |cm^2g^-1| DIM: (nlam) """
self.kabs: NDArray[np.float64] = np.zeros(shape=(nlam))
""" : The dust absorption coefficient for each wavelength per g of dust. UNIT: |cm^2g^-1| DIM: (nlam) """
self.ksca: NDArray[np.float64] = np.zeros(shape=(nlam))
""" : The dust scattering coefficient for each wavelength per g of dust. UNIT: |cm^2g^-1| DIM: (nlam) """
self.ksca_an: NDArray[np.float64] = np.zeros(shape=(nlam))
""" : The dust anisotropic (approximation) scattering coefficient for each wavelength per g of dust. UNIT: |cm^2g^-1| DIM: (nlam) """
self.asize: NDArray[np.float64] | None = None
""" : The grain size for each size bin. UNIT: micron DIM: (nsize) """
self.sigmaa: NDArray[np.float64] | None = None
""" : The radial surface density profiles for each individual grain. size UNIT: |gcm^-2| DIM: (nsize,nr) """
# FIXME: make this lazy variables
self.fsize: NDArray[np.float64] | None = None
""" : The normalized dust size distribution function. DIM: (nsize,nr,nz) """
self.fsize_rho: NDArray[np.float64] | None = None
""" : The dust size distribution function. Density for each grain size bin. UNIT: UNIT: |gcm^-3| DIM: (nsize,nr,nz) """
[docs]
class DataElements(object):
"""
Data for the Element abundances (the input). Holds the data from `Elements.out`.
The data is stored as `OrderedDict` (same order as in `Elements.out`) with the element names as keys.
Attributes
----------
"""
def __init__(self):
self.abun: OrderedDict[str, float] = OrderedDict()
""" : Ordered dictionary holding the element abundances relative to hydrogen. """
self.abun12: OrderedDict[str, float] = OrderedDict()
""" : abundances in the +12 notation """
self.massRatio: OrderedDict[str, float] = OrderedDict()
""" : mass ratios """
self.amu: OrderedDict[str, float] = OrderedDict()
""" : atomic mass unit """
self.muHamu: float | None = None
""" : `rho = muH*n<H>` with `muH/amu` from `Elements.out` """
def __str__(self):
output = "name 12 X/H\n"
for key in self.abun12.keys():
output = (
output
+ "{:5s}".format(key)
+ " "
+ "{:6.3f}".format(self.abun12[key])
+ " "
+ "{:6.3e}".format(self.abun[key])
+ "\n"
)
return output
[docs]
class DataSpecies(object):
"""
Data for the Species (the input).
Holds the data from `Species.out`.
The data is stored as OrderedDict with the element names as keys.
Attributes
----------
"""
def __init__(self):
self.mass: OrderedDict[str, float] = OrderedDict()
""" : The mass of the species *UNIT:* g. """
self.charge: OrderedDict[str, int] = OrderedDict()
""" : The charge of the species *UNIT:* e. """
self.chemPot: OrderedDict[str, float] = OrderedDict()
""" : The chemical potential as determined by |prodimo|. """
[docs]
def get_spdata(self, name) -> tuple[float, int, float]:
"""
Returns all data of the species with `name`.
Parameters
----------
name : str or int
The name of the species.
if int then use it as index
Returns
-------
(mass,charge,chemPot)
"""
if isinstance(name, int):
return (
list(self.mass.values())[name],
list(self.charge.values())[name],
list(self.chemPot.values())[name],
)
else:
return (self.mass[name], self.charge[name], self.chemPot[name])
def __str__(self):
output = "Number of species: " + str(len(self.mass.keys())) + "\n"
output += "{:16s} {:10s} {:5s} {:s}".format("Name", "mass [g]", "charge", "chemPot [eV]\n")
for name, mass, charge, chemPot in zip(
self.mass.keys(), self.mass.values(), self.charge.values(), self.chemPot.values()
):
output += "{:<16s} {:7.2e} {:5d} {:12.4f}".format(name, mass, charge, chemPot) + "\n"
return output
[docs]
class DataBgSpec(object):
"""
Background field input spectrum.
.. todo::
- is not yet included in Data_ProDiMo
"""
def __init__(self, nlam):
self.nlam = nlam
self.lam = np.zeros(shape=(nlam))
self.nu = np.zeros(shape=(nlam))
self.Inu = np.zeros(shape=(nlam))
[docs]
class Chemistry(object):
"""
Data container for chemistry analysis.
Holds the information for one particular molecule.
Also provides some convenience functions for further analysis.
"""
def __init__(self, name):
"""
Parameters
----------
name : string
The name of the model (can be empty).
Attributes
----------
"""
self.name: str = name
""" The name of the model (can be empty) """
self.totfrate: NDArray
""" Total formation rate at each spatial grid point. """
self.totdrate: NDArray
""" Total destruction rate at each spatial grid point. """
self.gridf: NDArray
""" Contains for each individual grid point a sorted list of the formation reactions including the index (0) and the rate (1) """
self.gridd: NDArray
""" Contains for each individual grid point a sorted list of the destruction reactions including the index (0) and the rate (1) """
self.fridxs: list
""" List of all formation reaction indices for this species. """
self.dridxs: list
""" List of all destruction reaction indices for this species. """
self.species: str
""" name of species for which the chamanalysis should be done """
self.model: Data_ProDiMo
""" A reference to the model, for which we do the analysis. """
[docs]
def reac_rates_ix_iz(
self,
ix: int = None,
iz: int = None,
locau: tuple[float, float] = None,
lowRatesFrac: float = 1.0e-3,
):
"""
Function that analyse the Chemistry manually via a point-by-point analysis for a given species.
Shows the most important formation and destruction rates for the given point ix,iz (or in au via Parameter `locau`).
Parameters
----------
ix : x-index corresponding to desired radial location in grid, starting at 0
iz : z-index corresponding to desired radial location in grid, starting at 0
locau :
the desired coordinates in au ``(x,z)``. The routine then finds the closest
grid point for those coordinates.
lowRatesFrac :
Only rates with rate/total_rate > lowRatesFrac are printed. Default: 1.e-3
This is useful to avoid printing all the reactions that are not important.
"""
model = self.model
if locau is not None:
ix = np.argmin(np.abs(model.x[:, 0] - locau[0]))
iz = np.argmin(np.abs(model.z[ix, :] - locau[1]))
if ix is None or iz is None:
print("Please provide either ix,iz or locau!")
return
print("Analysing point x=", str(model.x[ix, iz]) + " au z=" + str(model.z[ix, iz]) + " au")
print(" Detailed reaction rates for: %10s" % self.species)
print(
"------------------------------------------------------------------------------------------------------"
)
print(" grid point = %i %i" % (ix, iz))
print(" r,z [au] (cylindrical) = %.3f %.4f" % (model.x[ix, iz], model.z[ix, iz]))
print(
" n<H>,nd [cm^-3] = %.1e %.1e" % (model.nHtot[ix, iz], model.nd[ix, iz])
)
print(" Tgas,Tdust [K] = %.1e %.1e" % (model.tg[ix, iz], model.td[ix, iz]))
print(
" AV_rad,AV_ver = %.1e %.1e"
% (model.AVrad[ix, iz], model.AVver[ix, iz])
)
print(
" %10s" % self.species
+ " abundance = %e" % model.getAbun(self.species)[ix, iz]
)
print(
"------------------------------------------------------------------------------------------------------"
)
print(" Total form. rate [cm^-3 s^-1] = {:10.2e}".format(self.totfrate[ix, iz]))
for i, ridx in enumerate(self.gridf[ix, iz, 0]):
rate = self.gridf[ix, iz, 1][i]
if rate / self.totfrate[ix, iz] > lowRatesFrac: # don't print low rates
print(
self.reac_to_str(
model.chemnet.reactions[ridx - 1],
list(self.fridxs).index(ridx) + 1,
rate=rate,
)
)
print(
"------------------------------------------------------------------------------------------------------"
)
print(" Total dest. rate [cm^-3 s^-1] = {:10.2e}".format(self.totdrate[ix, iz]))
for i, ridx in enumerate(self.gridd[ix, iz, 0]):
rate = self.gridd[ix, iz, 1][i]
if rate / self.totdrate[ix, iz] > lowRatesFrac: # don't print unimportant rates
print(
self.reac_to_str(
model.chemnet.reactions[ridx - 1],
list(self.dridxs).index(ridx) + 1,
rate=rate,
)
)
print(
"------------------------------------------------------------------------------------------------------"
)
print("")
[docs]
def reac_to_str(self, reac, idx, rate=None):
"""
Converts a Reaction to the output format we want for the chemanalysis.
Parameters
----------
reac : :class:`prodimopy.chemistry.network.Reaction`
"""
# build the prods string assuming max three products
reacstr = "".join(["{:13s}".format(p) for p in reac.reactants])
reacstr += "".join(["{:13s}".format("") for p in range(3 - len(reac.reactants))])
prodstr = "".join(["{:13s}".format(p) for p in reac.products])
prodstr += "".join(["{:13s}".format("") for p in range(4 - len(reac.products))])
if rate is not None:
ratestr = "{:9.2e}".format(rate)
else:
ratestr = ""
return "{:5d} {:6d} {:2s} {:<s} -> {:<s} {:<s}".format(
idx, reac.id, reac.type, reacstr, prodstr, ratestr
)
[docs]
def get_reac_grid(self, level, rtype):
"""
Gets the counting index of the `x` important Reaction. If it does not exist
it is set to 0.
Parameters
----------
level : int
The level of importance `1` means most important, `2` second most important etc.
type : str
Formation (pass `f`) or destruction Reaction (pass `d`)
Returns
-------
tuple :
And array of shape (model.nx,model.nz) where the first one contains the index
of the Reactions for the list of reactions from chemanalysis (the output file)
and the second one the rate itself (for this Reaction at each point).
"""
if rtype == "d":
grid = self.gridd
ridxs = self.dridxs
else:
grid = self.gridf
ridxs = self.fridxs
nx = grid.shape[0]
nz = grid.shape[1]
gidx = np.zeros(shape=(nx, nz), dtype=int)
grate = np.zeros(shape=(nx, nz))
for ix in range(nx):
for iz in range(nz):
idxs = grid[ix, iz, 0]
if len(idxs) < level:
val = 0
rate = 0.0
else:
val = list(ridxs).index(idxs[level - 1]) + 1
rate = grid[ix, iz, 1][level - 1]
gidx[ix, iz] = val
grate[ix, iz] = rate
return gidx, grate
[docs]
class DataLineObs(DataLine):
"""
Holds the observational data for one line.
Differently to DataLine DataLineObs does not care about
multiple inclinations. So simply some setter and getters are used
to fill the list quantities (first entry).
Attributes
----------
"""
def __init__(self, flux, flux_err, fwhm, fwhm_err, flag):
super(DataLineObs, self).__init__()
self.flux = flux
self.flux_err = flux_err
self.fwhm = fwhm
self.fwhm_err = fwhm_err
self.flag = flag.lower()
self.profile = None
self.profileErr = None
@property
def flux(self):
return self._fluxs[0]
@flux.setter
def flux(self, flux):
self._fluxs.append(flux)
@property
def profile(self):
# FIXME: Check what happens if _profiles is empty
return self._profiles[0]
@profile.setter
def profile(self, value):
# Not very nice, but per definition there can be only one line profile
# from the observations (this avoids multiple entries)
if len(self._profiles) > 0:
self._profiles[0] = value
else:
self._profiles.append(value)
[docs]
class Data_ProDiMo(object):
"""
Data container for most of the output produced by |prodimo|.
The class also includes some convenience functions and derives/calculates
some additional quantities not directly included in the |prodimo| output.
.. todo::
- maybe put all the (real) observational data into one object (e.g. also the lines) e.g.DataObservations
- maybe also have something like DataObservables (hold, SED, imaged, lines etc.)
"""
def __init__(self, name: str):
"""
Parameters
----------
name : string
The name of the model (can be empty).
Attributes
----------
"""
self.name: str = name
""" : The name of the model (can be empty). """
self.directory: str | None = None
""" : The directory from which the model was read. Is e.g. set by :func:`~prodimopy.read.read_prodimo`. Can be a relative path. """
self._fpFlineEstimates = None # The path to the FlineEstimates.out File
self._versionFlineEstimates = None # the version of the formation of FlineEstimates.out
self.params: ModelParameters
""" : Dictionary that allows to access the models Parameters from Parameter.out. """
self.nx: int
""" : The number of spatial grid points in the x (radial) direction """
self.nz: int
""" : The number of spatial grid points in the z (vertical) direction """
self.nspec: int | None = None # allow None, mainly for test purposes
""" : The number of chemical species included in the model. """
self.nheat: int
""" : The number of heating processes included in the model. """
self.ncool: int
""" : The number of cooling processes included in the model. """
self.p_dust_to_gas: float
""" : The global dust to gas mass ratio (single value, given Parameter). """
self.p_v_turb: float
""" : The global turbulent velocity (single value). *UNIT:* |kms^-1| """
self.p_rho_grain: float
""" : The global grain mass density (the density of one dust grain). *UNIT:* |gcm^-3|. """
self.p_mdisk: float
""" : The disk mass in solar units. Is taken from ProDiMo.out. Represents the parameter value.
Please note if also an envelope is included or the structure comes from a 1D or 2D interface this value is not the actually disk mass.
"""
self.td_fileIdx: int | None = None
""" : The file index (prefix) in case of a time-dependent chemistry model (Starts at 1). """
self.age: float = np.inf
""" : The age of the model in years. Is taken from ``Parameter.out``. ``np.inf`` for steady-state model. *UNIT:* years. """
self.x: NDArray[np.float64]
""" : The x coordinates (radial direction). *UNIT:* au, *DIMS:* (nx,nz) """
self.z: NDArray[np.float64]
""" : The z coordinates (vertical direction). *UNIT:* au, *DIMS:* (nx,nz) """
self.rhog: TGridfloat
""" : The gas density. *UNIT:* |gcm^-3|, *DIMS:* (nx,nz) """
self.rhod: TGridfloat
""" : The dust density. *UNIT:* |gcm^-3|, *DIMS:* (nx,nz) """
self.d2g: TGridfloat
""" : The dust to gas mass ratio from |prodimo| *UNIT:* , *DIMS:* (nx,nz)"""
self.nHtot: TGridfloat
""" : The total hydrogen number density. *UNIT:* |cm^-3|, *DIMS:* (nx,nz) """
self.muH: float
""" : The conversion constant from `nHtot` to `rhog`. It is assumed that it is constant throughout the disk.
It is given by ``rhog/nHtot``. *UNIT:* g.
"""
self.NHver: TGridfloat
""" : Vertical total hydrogen column density. ``nHtot`` is integrated from the disk surface to the midplane at each radial grid point.
The intermediate results are stored at each grid point. For example ``NHver[:,0]`` gives the total column density as a function of radius.
*UNIT:* |cm^-2|, *DIMS:* (nx,nz).
"""
self.NHrad: TGridfloat
""" : Radial total hydrogen column density. Integrated along radial rays, starting from the star.
Otherwise same behaviour as :attr:`~prodimopy.read.Data_ProDiMo.NHver`. *UNIT:* |cm^-2|, *DIMS:* (nx,nz)
"""
self.nd: TGridfloat
""" : The dust number density. *UNIT:* |cm^-3|, *DIMS:* (nx,nz). """
self.tg: TGridfloat
""" : The gas temperature. *UNIT:* K, *DIMS:* (nx,nz). """
self.td: TGridfloat
""" : The dust temperature. *UNIT:* K, *DIMS:* (nx,nz). """
self.pressure: TGridfloat
""" : The gas pressure. *UNIT:* |ergcm^-3|, *DIMS:* (nx,nz). """
self.soundspeed: TGridfloat
""" : The isothermal sound speed. *UNIT:* |kms^-1|, *DIMS:* (nx,nz). """
self.velocity: TGridfloat
""" : The velocity field (vector) given as vx,vy,vz. *UNIT:* |kms^-1|, *DIMS:* (nx,nz,3). """
self.damean: TGridfloat
""" : The mean dust particle radius. Is defined as `<a3>**(1/3)`. *UNIT:* micron, *DIMS:* (nx,nz)."""
self.da2mean: TGridfloat
""" : The surface weighted mean dust particle radius. *UNIT:* micron, *DIMS:* (nx,nz) """
self.dNlayers: TGridfloat
""" : The number of ice layers on the dust grains. *DIMS:* (nx,nz). """
self.Hx: TGridfloat
""" : The X-ray energy deposition rate per hydrogen nuclei. *UNIT:* erg <H>\\ :sup:`-1`, *DIMS:* (nx,nz). """
self.zetaX: TGridfloat
""" : X-ray ionisation rate per hydrogen nuclei. *UNIT:* |s^-1|, *DIMS:* (nx,nz). """
self.zetaCR: TGridfloat
""" : Cosmic-ray ionisation rate per molecular hydrogen (H2) *UNIT:* |s^-1|, *DIMS:* (nx,nz). """
self.zetaSTCR: TGridfloat
""" : Stellar energetic particle ionisation rate per H2. *UNIT:* |s^-1|, *DIMS:* (nx,nz). """
self.tauX1: TGridfloat
""" : Radial optical depth at 1 keV (for X-rays). *DIMS:* (nx,nz). """
self.tauX5: TGridfloat
""" : Radial optical depth at 5 keV (for X-rays). *DIMS:* (nx,nz). """
self.tauX10: TGridfloat
""" : Radial optical depth at 10 keV (for X-rays). *DIMS:* (nx,nz). """
self.AVrad: TGridfloat
""" : Radial visual extinction (measured from the star outwards). *DIMS:* (nx,nz). """
self.AVver: TGridfloat
""" : Vertical visual extinction (measured from the disk surface to the mid-plane). *UNIT:*, *DIMS:* (nx,nz). """
self.AV: TGridfloat
""" : Given by ``min([AVver[ix,iz],AVrad[ix,iz],AVrad[nx-1,iz]-AVrad[ix,iz]])``, hence the lowest visual extinction at a certain point.
It is assumed radiation can escape either vertically upwards, radially inwards or radially outwards. *DIMS:* (nx,nz).
"""
self.nlam: int
""" : The number of wavelength bands used in the continuum radiative transfer. """
self.lams: NDArray[np.float64]
""" : The band wavelengths (centers) considered in the radiative transfer. *UNIT:* microns, *DIMS:* (nlam). """
self.lambs: NDArray[np.float64] | None = None
""" : The band wavelengths (edges) considered in the radiative transfer. *UNIT:* microns, *DIMS:* (nlam+1).
This can be `None` as the edges wavelengths are only available with XrayRT or gasRT mode.
"""
self.chi: NDArray[np.float64]
""" : Geometrical UV radiation field in units of the Drain field. *UNIT:* Draine field, *DIMS:* (nx,nz). """
self.chi1_shielded: NDArray[np.float64]
""" : The shielded UV radiation field (i.e. self-shielding is taken into account). *UNIT:* Draine field 91.2-205 nm, *DIMS:* (nx,nz). """
self.chi1_unshielded: NDArray[np.float64]
""" : The unshielded UV radiation field (no self-shielding). *UNIT:* Draine field 91.2-205 nm, *DIMS:* (nx,nz). """
self.chi2_shielded: NDArray[np.float64]
""" : The shielded UV radiation field (i.e. self-shielding is taken into account). *UNIT:* Habing field 91.2-111nm, *DIMS:* (nx,nz). """
self.chi2_unshielded: NDArray[np.float64]
""" : The unshielded UV radiation field (no self-shielding). *UNIT:* Habing field 91.2-111 nm, *DIMS:* (nx,nz). """
self.chiRT: NDArray[np.float64]
""" : UV radiation field as properly calculated in the radiative transfer, in units of the Drain field. *UNIT:* Draine field, *DIMS:* (nx,nz). """
self.kappaRos: TGridfloat | None = None
""" : Rosseland mean opacity. In case of gas radiative transfer for the dust plus the gas. *UNIT:* |cm^-1|, *DIMS:* (nx,nz). """
self.tauchem: TGridfloat
""" : Chemical timescale (steady-state) *UNIT:* yr, *DIMS:* (nx,nz). """
self.taucool: TGridfloat
""" : Cooling timescale. *UNIT:* yr, *DIMS:* (nx,nz). """
self.taudiff: TGridfloat
""" : Vertical radiative diffusion timescale (using the Rosseland mean opacities). *UNIT:* yr, *DIMS:* (nx,nz). """
self.spnames: typing.Dict[str, int]
""" : Dictionary providing the index of a particular species (e.g. spnames["CO"]). This index can be used for arrays having an species dimension (like nmol).
The electron is included. *DIMS:* (nspec). """
self.solved_chem: NDArray[np.int32] # FIXME: I think the doc is not up-to-date any more
""" : Flag for chemistry solver (values, 0,1,2) (see |prodimo| code). 1 means everything okay, 0 failure, 2 time-dependent step needed. *DIMS:* (nx,nz). """
self.rateH2form: TGridfloat
""" : The H2 formation ratio. *UNIT:* |s^-1|, *DIMS:* (nx,nz). """
self.rateH2diss: np.ndarray[tuple[int, int, int], np.float64]
""" : The different H2 dissociation rates. *UNIT:* |s^-1|, *DIMS:* (nx,nz,3). """
self.isoratio_12CO13CO: TGridfloat | None = None
""" : Isotope ratio of 12CO to 13CO, if it was estimated in the model. *DIMS:* (nx,nz) """
self.heat_names: list[str]
""" All the names of the heating processes. """
self.cool_names: list[str]
""" All the names of the cooling processes. """
self.heat_mainidx: NDArray[np.int32]
""" : Index of the main heating process at the given grid point. *UNIT:* , *DIMS:* (nx,nz). """
self.cool_mainidx: NDArray[np.int32]
""" : Index of the main cooling process at the given grid point. *UNIT:* , *DIMS:* (nx,nz). """
self.lineEstimates: list[DataLineEstimate] | None = None
""" : All the line estimates from FlineEstimates.out. Each spectral line in FlineEstimates corresponds to one :class:`prodimopy.read.DataLineEstimate` object. """
self.lines: list[DataLine] | None = None
""" : All the spectral lines from `line_flux.out` (proper Line transfer). Each spectral line in `line_flux.out` is represented by one :class:`prodimopy.read.DataLine` object.
This also includes multiple inclinations.
"""
self._sed: DataSED | None = None
""" : The Spectral Energy Distribution for the model (SED) as calculated in the radiative transfer with ray tracing. Can only be accessed via getter setter
to deal with inclinations. See :class:`prodimopy.read.DataSED` for details.
"""
self.contImages: DataContinuumImages | None = None
""" : Holds the continuum images data (image.out) if available. The full images are only read if requested for a particular wavelength. """
self.star: DataStar | None = None
""" : The stellar properies including the (unattenuated) input spectrum. """
self.gas: DataGas | None = None
""" : Holds various properties of the gas component (e.g. opacities). """
self.dust: DataDust | None = None
""" : Holds various properties of the dust component (e.g. opacities). """
self.env_dust: DataDust | None = None
""" : Holds various properties of the dust component (e.g. opacities) of the envelope. Only relevant if |prodimo| is used in the envelope mode. """
self.dust_charge_mean: NDArray[np.float64] | None = None
""" : The mean dust charge, if it was calculated. *DIMS:* (nx,nz). """
self.dust_charge_min: NDArray[np.float64] | None = None
""" : The minimum dust charge, if it was calculated. *DIMS:* (nx,nz). """
self.dust_charge_max: NDArray[np.float64] | None = None
""" : The max dust charge, if it was calculated. *DIMS:* (nx,nz). """
self.elements: DataElements
""" : Holds the element abundances (Elements.in) """
self.species: DataSpecies
""" : Holds the initial species data (Species.in). """
self.sedObs: DataContinuumObs | None = None
""" : Holds the provided SED observations (photometry, spectra, extinction etc.)."""
self.lineObs: list[DataLineObs] | None = None
""" : Holds the provide line observations (e.g. LINEObs.dat and line profiles). """
self.FLiTsSpec: DataFLiTsSpec | None = None
""" : Holds the FLiTs spectrum if it exists. """
self._log: bool = True
""" : Allows to switch off some log statements (not consistently implemented yet). """
self._pAUtocm: float = 1.495978700e13
""" : Conversion factor from AU to cm taken directly from |prodimo|. """
#
# these are some cache variables for lazy initialization. Using them allows to do
# conversions/calculations only if the quantities are accessed (used)
#
self._cool_cache: NDArray = None
self._cool: NDArray = None
self._heat_cache: NDArray = None
self._heat: NDArray = None
self._radFields_cache: NDArray = None
self._radFields: NDArray = None
self._nmol_cache: NDArray = None
self._nmol: NDArray = None
self._sdg: NDArray = None
self._sdd: NDArray = None
self._vol: NDArray = None
self._cdnmol: NDArray = None
self._rcdnmol: NDArray = None
self._chemnet: pcnet.ReactionNetworkPout = None
@property
@deprecated("Use model.star.mass instead.")
def mstar(self) -> float:
""": The stellar mass in solar units. Is taken from ProDiMo.out
Deprecated: use :attr:`~prodimopy.read.DataStar.mass` instead.
"""
return self.star.mass
@property
@deprecated("Use model.star instead.")
def starSpec(self) -> DataStar:
""": The stellar properties including the (unattenuated) input spectrum.
Deprecated: use :attr:`~prodimopy.read.Data_ProDiMo.star` instead.
"""
return self.star
@property
def sed(self) -> DataSED | None:
""": Not sure if this is the best solution, but need a getter. It returns the SED for the current inclination."""
return self._sed
@sed.setter
def sed(self, value: DataSED):
"""Set the SED for the current inclination"""
self._sed = value
@property
def nmol(self) -> NDArray[np.float64]:
""": Number densities of all chemical species (mainly molecules). *UNIT:* |cm^-3|, *DIMS:* (nx,nz,nspec)."""
if self._nmol_cache is not None:
self._nmol = np.array(self._nmol_cache, dtype=np.float64)
self._nmol_cache = None # remove the string cache
return self._nmol
@nmol.setter
def nmol(self, value):
self._nmol = value
@property
def cdnmol(self) -> NDArray[np.float64]:
""": Vertical column number densities for each chemical species at each point in the disk. Integrated from the surface to the midplane at each radial grid point.
*UNIT:* |cm^-2|, *DIMS:* (nx,nz,nspec).
"""
if self._cdnmol is None:
_calc_cdnmol(self)
return self._cdnmol
@cdnmol.setter
def cdnmol(self, value):
self._cdnmol = value
@property
def rcdnmol(self) -> NDArray[np.float64]:
""": Radial column number densities for each species at each point in the disk. Integrated from the star outwards along fixed radial rays given by the vertical grid.
*UNIT:* |cm^-2|, *DIMS:* (nx,nz,nspec)
"""
if self._rcdnmol is None:
_calc_rcdnmol(self)
return self._rcdnmol
@rcdnmol.setter
def rcdnmol(self, value):
self._rcdnmol = value
@property
def sdg(self) -> NDArray[np.float64]:
""": The gas vertical surface density. This is for only one half of the disk (i.e. from z=0 to z+), like in |prodimo|. For the total surface density multiply by two.
*UNIT:* |gcm^-2|, *DIMS:* (nx,nz)
"""
if self._sdg is None:
calc_surfd(self)
return self._sdg
@sdg.setter
def sdg(self, value: NDArray):
self._sdg = value
@property
def sdd(self) -> NDArray[np.float64]:
""": The dust vertical surface density. This is for only one half of the disk (i.e. from z=0 to z+), like in |prodimo|. For the total surface density multiply by two.
*UNIT:* |gcm^-2|, *DIMS:* (nx,nz)
"""
if self._sdd is None:
calc_surfd(self)
return self._sdd
@sdd.setter
def sdd(self, value):
self._sdd = value
@property
def mgasdisk(self) -> float:
"""
The total disk gas mass (integrated rhog).
Returns
-------
float
Disk gas mass in g.
"""
return np.sum(self.rhog*self.vol)
@property
def mdustdisk(self) -> float:
"""
The total disk dust mass (integrated rhod).
Returns
-------
float
Disk dust mass in g.
"""
return np.sum(self.rhod*self.vol)
@property
def cool(self) -> NDArray[np.float64]:
""": Cooling rates for the various cooling processes. *UNIT:* |ergcm^-3s^-1| *DIMS:* (nx,nz,ncool)."""
if self._cool_cache is not None:
self._cool = np.array(self._cool_cache, dtype=float)
self._cool_cache = None # remove the string cache
return self._cool
@cool.setter
def cool(self, value):
self._cool = value
@property
def heat(self) -> NDArray[np.float64]:
""": Heating rates for the various heating processes. *UNIT:* |ergcm^-3s^-1| *DIMS:* (nx,nz,nheat)."""
if self._heat_cache is not None:
self._heat = np.array(self._heat_cache, dtype=np.float64)
self._heat_cache = None # remove the string cache
return self._heat
@heat.setter
def heat(self, value):
self._heat = value
@property
def radFields(self) -> NDArray[np.float64]:
""" : Radiation field (mean intensity) for each wavelength band. *UNIT:* erg |s^-1| |cm^-2| |sr^-1| |Hz^-1|, *DIMS:* (nx,nz,nlam)."""
if self._radFields_cache is not None:
self._radFields = np.array(self._radFields_cache, dtype=np.float64)
self._radFields_cache = None # remove the string cache now
return self._radFields
@radFields.setter
def radFields(self, value):
self._radFields = value
@property
def vol(self) -> NDArray[np.float64]:
""": The volume for each grid point. *UNIT:* |cm^3|, *DIMS:* (nx,nz)."""
if self._vol is None:
_calc_vol(self)
return self._vol
@vol.setter
def vol(self, value):
self._vol = value
@property
def chemnet(self) -> pcnet.ReactionNetworkPout:
"""The chemical reactions of the model. Holds what is in ``Reactions.out``. Is only read if needed."""
if self._chemnet is None:
self._chemnet = pcnet.ReactionNetworkPout(name=self.name, modeldir=self.directory)
return self._chemnet
@chemnet.setter
def chemnet(self, value: pcnet.ReactionNetworkPout):
self._chemnet = value
@property
def velocity_xz(self) -> NDArray[np.float64]:
""":
The absolute xz-component of the :attr:`velocity` field. *UNIT:* |kms^-1|, *DIMS:* (nx,nz).
"""
return np.sqrt((self.velocity[:, :, 0]) ** 2 + (self.velocity[:, :, 2]) ** 2)
def __str__(self):
output = "Info ProDiMo.out: \n"
output += "NX: " + str(self.nx) + " NZ: " + str(self.nz) + " NSPEC: " + str(self.nspec)
output += (
" NLAM: "
+ str(self.nlam)
+ " NCOOL: "
+ str(self.ncool)
+ " NHEAT: "
+ str(self.nheat)
)
output += "\n"
output += "p_dust_to_gas: " + str(self.p_dust_to_gas)
return output
def _getLineIdx(self, wl, ident=None):
if self.lines is None:
return None
wls = np.array([line.wl for line in self.lines])
if ident is not None:
linestmp = [line for line in self.lines if line.ident == ident]
if linestmp is not None and len(linestmp) > 0:
wlstmp = np.array([line.wl for line in linestmp])
itmp = np.argmin(abs(wlstmp[:] - wl))
# get the index of the whole line array, trusting python.
idx = self.lines.index(linestmp[itmp])
# super security check, should not happen so throw and exception
if self.lines[idx].ident != ident:
raise ValueError(
"Something is wrong found: ident",
self.lines[idx].ident,
" and wl ",
self.lines[idx].wl,
" for ",
ident,
wl,
)
else:
return idx
else:
print("WARN: No line found with ident", ident, " and wl ", wl)
return None
else:
idx = np.argmin(abs(wls[:] - wl))
if (abs(wls[idx] - wl) / wl) > 0.01:
print("WARN: No line found within 1% of the given wavelength:", wl)
return None
else:
return idx
[docs]
def sedinc(self, incidx: int = 0) -> DataSED:
"""
Get the SED for for the given inclination index ``iinc``.
Parameters
----------
incidx :
The inclination index for which the SED should be retrieved.
Returns
-------
DataSED
The SED for the specified inclination index.
"""
assert self._sed is not None
nincs = len(self._sed._inclinations)
if nincs == 1:
self._sed._incidx = 0
elif incidx >= nincs:
self._sed._incidx = nincs - 1
else:
self._sed._incidx = incidx
return self._sed
[docs]
def getSEDAnaMask(self, lam):
"""
Returns a numpy mask where only the grid points outside the emission
origin area for the given wavelength (continuum) are marked as invalid.
This mask can be used in e.g. :func:`~prodimopy.read.Data_ProDiMo.avg_quantity`
.. warning::
in case of optically thin emission only the points of one half
of the disk are considered. In that case an average quantity of this
box(mask) will not be completely correct.
Parameters
----------
lam : float
the wavelength for which the emission origin region should be determined:
UNITS: micron
Returns
-------
array_like(float,ndim=2)
Numpy mask with *DIMS:* (nx,nz)
"""
assert self.sed is not None
sedAna = self.sed.sedAna
x15 = interp1d(sedAna.lams, sedAna.r15, bounds_error=False, fill_value=0.0, kind="linear")(
lam
)
x85 = interp1d(sedAna.lams, sedAna.r85, bounds_error=False, fill_value=0.0, kind="linear")(
lam
)
mask = np.ones(shape=(self.nx, self.nz))
for ix in range(self.nx):
if self.x[ix, 0] >= x15 and self.x[ix, 0] <= x85:
z85 = interp1d(
sedAna.lams,
sedAna.z85[:, ix],
bounds_error=False,
fill_value=0.0,
kind="linear",
)(lam)
z15 = interp1d(
sedAna.lams,
sedAna.z15[:, ix],
bounds_error=False,
fill_value=0.0,
kind="linear",
)(lam)
for iz in range(self.nz):
if self.z[ix, iz] <= z15 and self.z[ix, iz] >= z85:
mask[ix, iz] = 0
return mask
[docs]
def getLine(self, wl: float, ident: str = None, incidx: int = 0) -> DataLine | None:
"""
Returns the spectral line (from the line RT) closest to the given wavelength.
Parameters
----------
wl :
The wavelength which is used for the search. *UNIT:* micron.
ident :
A line identification string which should also be considered for the search.
Please note that it is not the chemical species name. For example if you have to lines
for the same CO transition but one for NLTE (``ident=CO``) and one for LTE (``ident=CO_lte``)
the species will be the same in both. To properly select those lines you need to
provided the ident as the wl is identical.
incidx :
select the inclination for the line. (default: 0) . 0 means
the first one. If only one inclination exists always this one will be used
(i.e. the value of incidx is ignored).
Returns
-------
DataLine
Returns `None` if no lines are included in the model.
"""
idx = self._getLineIdx(wl, ident=ident)
if idx is None:
return None
else:
assert self.lines is not None
line = self.lines[idx]
if len(line._inclinations) == 1:
line._incidx = 0
else:
# only here I want to really set it
line._incidx = incidx
return line
[docs]
def selectLines(self, ident: str) -> list[DataLine] | None:
"""
Returns a list of all line fluxes included in the line transfer, for the given line ident.
Parameters
----------
ident :
The line identification (species name) as defined in |prodimo|.
The ident is not necessarily equal to the underlying chemical species
name (e.g. isotopologues, ortho-para, or cases like N2H+ and HN2+)
Returns
-------
list[DataLine] :
All lines for the given ident, or an empty list if nothing was found.
``None`` if no lines are available at all.
"""
if self.lines is None:
return None
lines = list()
for le in self.lines:
if le.ident == ident:
lines.append(le)
return lines
[docs]
def gen_specFromLineEstimates(
self,
wlrange=[10, 15],
ident: str | list[str] | None = None,
specR=3000,
unit="W",
contOnly=False,
noCont=False,
faceon: bool =False,
) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
"""
Generates a "Spectrum" from the line estimates results of |prodimo| and
convolves it to the given spectral resolution.
If the SED was also calculated the line fluxes will be added to the continuum,
if not a continuum of zero flux is assumed.
This routine can become very slow, depending on the number of lines
within a given wavelength range. It can be more efficient to produce smaller
chunks with not so many lines.
Parameters
----------
wlrange : array_like
Generate the spectrum in the wavelength range [start,end]
Default: `[10,15]` Units: micron
ident :
only the lines with this ident (like given in the line estimates) are
considered. Default: `None` (all lines in the given wlrange are considered)
if ident is a list of strings, all those molecules will be considered
specR : int
the desired spectral resolution. If `None` a spectrum with simple the
line fluxes added (i.e. as "delta function") is returned.
Default: `3000`
unit : str
desired unit for the output. Current choices `W` (W/m^2/Hz) or 'Jy' (Jansky).
`W` is the default option.
contOnly : boolean
only do it for the continuum (do not add any lines).
Default: `False`
noCont : boolean
assume zero continuum
Default: `False`
faceon : bool
whether to use the face-on SED for the continuum calculation.
Default: `False`
Returns
-------
(tuple):
tuple containing: wls(array_like): array of wavelength points in micron,
flux(array_like): flux values for each wavelength point in |Wm^-2Hz^-1|
or Jy (depending on the `unit` parameter)
"""
import astropy.convolution as conv
# one can call the routine like this ([5,10],3000). However, 3000 will be taken as the ident
# try to catch this case
if ident is not None and isinstance(ident, (int, float)):
specR = ident
ident = None
if isinstance(ident, str) or ident is None:
ident = [ident]
startT = timer()
lmin = wlrange[0] / 1.0e4
lmax = wlrange[1] / 1.0e4
R = 1.0e6
if specR is not None and specR > (0.1 * R):
print(
"WARN: requested spectral resolution is too high use " + str(0.1 * R) + " instead"
)
# technical spectral resolution
# determine the number of points required
del_loglam = np.log10(1.0 + 1.0 / R) # log(lam+dlam)-log(lam)
N = 1 + int(np.log10(lmax / lmin) / del_loglam)
mwlsline = np.logspace(np.log10(lmin), np.log10(lmax), N)
if self.sed is None or noCont:
contfluxline = mwlsline * 0.0 # no continuum
elif faceon:
contfluxline = np.interp(mwlsline, self.sed.lam / 1.0e4, self.sed.fnuFaceonErg)
else:
contfluxline = np.interp(mwlsline, self.sed.lam / 1.0e4, self.sed.fnuErg)
# do everything in cgs units
confac = (1.0 * (u.W / (u.m**2))).to(u.erg / u.s / (u.cm**2)).value
lineest=list()
for id in ident:
lineest+= self.selectLineEstimates(ident=id, wlrange=wlrange)
# print("INFO: gen_specFromLineEstimates: build spectrum for "+
# str(len(lineest))+" lines ...")
# this loop is taking most of the time , not the convolution.
# I guess that could still be faster, but that might require a lot more
# memory
# from tqdm import tqdm
# for line in tqdm(lineest):
tonu = const.c.cgs.value / R
for line in tqdm(lineest, "INFO: gen_specFromLineEstimates: build spectrum: "):
wlcm = line.wl / 1.0e4
# Find the closes wavelength point in the new grid.
# idx=np.argmin(np.abs(mwlsline-wlcm))
# this does the same as above but seems to be 10 times faster
idx = np.argmax(mwlsline > (wlcm))
if (mwlsline[idx] - wlcm) > (wlcm - mwlsline[idx - 1]):
idx -= 1
# like in the idl script
# R=mwlsline[idx]/(mwlsline[idx]-mwlsline[idx-1])
# print(R)
# just use delta functions -> line profile is a delta function
if not contOnly:
if specR is None:
contfluxline[idx] = contfluxline[idx] + (line.flux * confac)
else:
dnu = tonu / wlcm
contfluxline[idx] = contfluxline[idx] + (line.flux * confac) / dnu
print("INFO: gen_specFromLineEstimates: convolve spectrum ...")
if specR is None:
FWHMpix = 1.0
mwlslinez = mwlsline
contfluxline_conv = contfluxline
else:
# in the idl script this is interpreted as the FWHM,
# but the convolution routine wants stddev use relation
# FWHM=2*sqrt(2ln2)*stddev=2.355/stddev
# this should than be consistent with the result from the
# ProDiMo idl script
FWHMpix = R / specR
g = conv.Gaussian1DKernel(stddev=FWHMpix / 2.355, factor=7)
# Convolve data
contfluxline_conv = conv.convolve(contfluxline, g)
# print(z)
# remove beginning and end ... strange values
cut = 2 * int(FWHMpix)
contfluxline_conv = contfluxline_conv[cut : (len(contfluxline_conv) - cut)]
mwlslinez = mwlsline[cut : (len(mwlsline) - cut)]
# now downgrade the grid to something reasonable
outwls = np.linspace(
np.min(mwlslinez), np.max(mwlslinez), int(len(mwlsline) / FWHMpix) * 5
)
outflux = np.interp(outwls, mwlslinez, contfluxline_conv)
outflux = outflux / confac
if unit == "Jy":
outflux = (outflux * u.W / u.m**2 / u.Hz).to(u.Jy).value
print("INFO: time: ", "{:4.2f}".format(timer() - startT) + " s")
return outwls * 1.0e4, outflux
[docs]
def getLineEstimate(self, ident, wl):
"""
Finds and returns the line estimate for the given ident which is closest
to the given wavelength. If the ident is unknown None is returned. If the ident
exist the routine always returns an line estimate object.
Parameters
----------
ident : string
The line identification string. Note that the ident is not necessarily equal to the
corresponding chemical species.
wl : float
the wavelength which is used for the search. *UNIT:* micron.
Returns
-------
:class:`prodimopy.read.DataLineEstimate`
Returns `None` if the line estimate is not found.
Notes
-----
The parameters of this method are not consistent
with :meth:`~prodimopy.read.Data_ProDiMo.getLine`. This might be confusing.
However, there is a reason for this. The number of
included line estimates in a |prodimo| model can be huge and just searching
with the wavelength might become slow. However, this probably can be improved.
To speed up the reading of `FlineEstimate.out` the extra radial info
(see :class:`~prodimopy.read.DataLineEstimateRInfo` of all the line estimates
is not read. However, in this routine it is read for the single line estimate
found. One should use this routine to access the radial info for a single line estimate.
"""
found = -1
i = 0
mindiff = 1.0e20
if self.lineEstimates is None:
print("WARN: No lineEstimates are included (or not read) for this model.")
return None
# print ident, wl
for le in self.lineEstimates:
if le.ident == ident:
diff = abs(le.wl - wl)
if diff < mindiff:
found = i
mindiff = diff
i = i + 1
# that means the ident (species) was not found
if found == -1:
print("ERROR: Could not find any line estimates for ident: ", ident)
return None
return self.lineEstimates[found]
[docs]
def selectLineEstimates(
self, ident: str | None, wlrange: list[float, float] | None = None
) -> list[DataLineEstimate]:
"""
Returns a list of all line estimates (i.e. all transitions) for the
given line ident and/or in the given wavelength range.
Parameters
----------
ident :
The line identification (species name) as defined in |prodimo|.
The ident is not necessarily equal to the underlying chemical species
name (e.g. isotopologues, ortho-para, or cases like N2H+ and HN2+).
If ``wlrange`` is set ident can be ``None`` in that case all lines in the given
wavelength range are returned.
wlrange : All lines in the given wavelength range [start,end] and the given ident
are returned. if the ident is `None` all lines are returned.
Default: `None` Units: micron
Returns
-------
list(:class:`prodimopy.read.DataLineEstimate`) :
List of :class:`prodimopy.read.DataLineEstimate` objects, or empty list if nothing was found.
Notes
-----
In this routine the additional radial information of the line estimate
(see :class:`~prodimopy.read.DataLineEstimateRInfo`) is not read.
Examples
--------
.. code-block:: python
# select all line estimates for CO
lests=model.selectLineEstimates("CO")
# select only the 1300 micron CO line
lests=model.selectLineEstimates("CO",wlrange=[1300,1310])
# select all linest in the given wavelength range
lests=model.selectLineEstimates(None,wlrange=[1200,1400])
# print all of them to see what we got
for lest in lests:
print(lest)
"""
assert self.lineEstimates is not None
lines = list()
if wlrange is None:
for le in self.lineEstimates:
if le.ident == ident:
lines.append(le)
else:
# TODO: maybe this can be done better/faster
for le in self.lineEstimates:
if le.wl >= wlrange[0] and le.wl <= wlrange[1]:
if ident is None or le.ident == ident:
lines.append(le)
if len(lines) == 0:
print("ERROR: Could not find any line estimates ...")
return lines
[docs]
def getLineOriginMask(self, lineEstimate: DataLineEstimate) -> NDArray[np.float64]:
"""
Returns a numpy mask where the grid points outside the emission
origin area for the given ``lineEstimate`` are marked as invalid.
This mask can be used in e.g. :func:`~prodimopy.read.Data_ProDiMo.avg_quantity`
Parameters
----------
lineEstimate :
A line estimate object for which the mask should be created.
Returns
-------
NDArray[np.float64]
Numpy mask with *DIMS:* (nx,nz)
"""
fcfluxes = np.array([x.Fcolumn for x in lineEstimate.rInfo])
Fcum = fcfluxes[:]
for i in range(1, self.nx):
Fcum[i] = Fcum[i - 1] + fcfluxes[i]
interx = interp1d(Fcum / Fcum[-1], self.x[:, 0])
x15 = interx(0.15)
x85 = interx(0.85)
mask = np.ones(shape=(self.nx, self.nz))
for ix in range(self.nx):
if self.x[ix, 0] >= x15 and self.x[ix, 0] <= x85:
for iz in range(self.nz):
rinfo = lineEstimate.rInfo[ix]
# print(rinfo.z15,rinfo.z85)
if self.z[ix, iz] <= rinfo.z15 and self.z[ix, iz] >= rinfo.z85:
mask[ix, iz] = 0
return mask
[docs]
def getAbun(self, spname):
"""
Returns the abundance for a given species.
Parameters
----------
spname : string
The name of the chemical species as define in |prodimo|.
Returns
-------
array_like(float,ndim=2)
an array of dimension [nx,nz] with species abundance or `None` if the species was not found.
Notes
-----
The abundance is given relative to the total hydrogen nuclei density
(i.e. ``nmol(spname)/nHtot``)
A warning is printed in case the species was not found in spnames.
"""
if spname not in self.spnames:
print("WARN: getAbun: Species " + spname + " not found.")
return None
return self.nmol[:, :, self.spnames[spname]] / self.nHtot
[docs]
def get_TotSpeciesMass(self, spname=None):
"""
Returns the total mass (integrated over the whole disk) for all or one given species.
The total mass is calculated by summing up the product of the number density and the volume
at each grid point and multiplying it with the mass of one molecule.
A warning is printed in case the species was not found in spnames.
Parameters
----------
spname : string
The name of the chemical species as defined in |prodimo|.
If `None` an array with the species masses of all elements will be returned.
Returns
-------
float or array_like(float)
The total mass of the species in grams or an array with the masses of all species.
`None` if the species was not found.
"""
def tmass(spec, model):
return np.sum(
model.vol * (model.nmol[:, :, model.spnames[spec]] * model.species.mass[spec])
)
if spname is None:
all = list()
for spname in self.spnames:
all.append(tmass(spname, self))
return np.array(all)
else:
if spname not in self.spnames:
print("WARN: getTotSpeciesMass: Species " + spname + " not found.")
return None
return tmass(spname, self)
[docs]
def get_toomreQ(self, mstar=None):
"""
Returns the Toomre Q parameter as a function of radius.
(for the midplane).
Q is given by
Q(r)=k(r)*cs(r)/(pi * G * sigma(r))
for k we use the keplerian frequency, cs is the soundspeed (from the mdoel)
and sigma is the total gas surface density (both half's of the disk).
Parameters
----------
mstar : float
The stellar mass in solar units (optional).
If `None` the value from the model is taken.
Returns
-------
array_like(float,ndim=1)
an array of dimension [nx] with the Q param
"""
if mstar is None:
mstar = self.mstar
mstarc = (mstar * u.M_sun).cgs.value
grav = const.G.cgs.value
r = (self.x[:, 0] * u.au).cgs.value
# isothermal soundspeed
cs = (self.soundspeed[:, 0] * u.km / u.s).cgs.value
# surface density factor two for both half of the disks
sigma = 2.0 * (self.sdg[:, 0] * u.g / u.cm**2).cgs.value
# assume Keplerian rotation for Epicyclic frequency
kappa = np.sqrt(grav * mstarc / r**3)
Q = kappa * cs / (math.pi * grav * sigma)
return Q
[docs]
def get_VKep(self, mstar=None, type=2):
"""
Returns the Keplerian rotation velocity [km/s] as a function
of radius and height (same as in |prodimo|)
Type 1: vkep=sqrt((G* mstar) / r)
Type 2: vkep=sqrt((G* mstar * x**2) / r**3)
Type 3: with pressure gradient (see Rosenfeld+ 2013)
Parameters
----------
mstar : float
The stellar mass in solar units (optional).
If `None` the value from the model is taken.
Returns
-------
array_like(float,ndim=1)
Rotation velocity [km/s]
"""
if mstar is None:
mstar = self.mstar
mstarc = (mstar * u.M_sun).cgs.value
grav = const.G.cgs.value
tocm = ((1 * u.au).to(u.cm)).value
r = np.sqrt(self.x**2 + self.z**2) * tocm
if type == 1:
# FIXME: Should be radius, but to be consistent do it like this.
# but check again
vkep = np.sqrt(grav * mstarc / (self.x * tocm))
else:
vrot2 = (grav * mstarc * (self.x * tocm) ** 2) / r**3
if type == 2: # standard in ProDiMo
vkep = np.sqrt(vrot2)
elif type == 3: # include the pressure term
# pressuregradient like Rosenfeld+ 2013
dp = self.x * 0.0
for iz in range(self.nz):
# this is used in e.g. Rosenfeld et al 2013 and others
dp[:, iz] = np.gradient(self.pressure[:, iz], self.x[:, iz] * tocm)
# this we use in ProDiMo currently I think that is correct
# doesn't seem to much of a difference
# dp[:,iz]=np.gradient(self.pressure[:,iz],r[:,iz])
vkep = np.sqrt(vrot2 + (self.x * tocm) / self.rhog * dp)
else:
vkep = np.sqrt(vrot2)
vkep = ((vkep * u.cm / u.s).to(u.km / u.s)).value
return vkep
[docs]
def get_KeplerOmega(self, mstar=None):
"""
Returns the Keplerian orbital frequency [1/s]
(for the midplane).
omega=sqrt(G*mstar/r**3)
Parameters
----------
mstar : float
The stellar mass in solar units (optional).
If `None` the value from the model is taken.
Returns
-------
array_like(float,ndim=1)
Keplerian orbital frequency as function of radius [1/s]
"""
if mstar is None:
mstar = self.mstar
mstarc = (mstar * u.M_sun).cgs.value
grav = const.G.cgs.value
r = (self.x[:, 0] * u.au).cgs.value
# assume keplerian rotation for Epicyclic frequency
omega = np.sqrt(grav * mstarc / r**3)
return omega
[docs]
def int_ring(self, quantity, r1=None, r2=None):
"""
Integrates the given quantity (needs to be a function of r) from
rin to rout.
No interpolation is done. Simply the nearest neighbour for rin and rout
are taken.
The integration is done in cgs units and also the return value is in cgs units.
For example if the total gas surface density is passed this routine gives the disk mass.
(if r1=rin and r2=rout).
Parameters
----------
quantity : array_like(float,ndim=1)
the quantity to average. *DIMS:* (nx).
r1: float
inner radius [au].
optional DEFAULT: inner disk radius
r2: float
inner radius [au].
optional DEFAULT: outer disk radius
"""
if r1 is None:
r1idx = 0
else:
r1idx = np.argmin(np.abs(self.x[:, 0] - r1))
if r2 is None:
r2idx = self.nx - 1
else:
r2idx = np.argmin(np.abs(self.x[:, 0] - r2))
print(
"INFO: Integrate from "
+ "{:8.4f} ({:4d})".format(self.x[r1idx, 0], r1idx)
+ " au to "
+ "{:8.4f} ({:4d})".format(self.x[r2idx, 0], r2idx)
+ " au"
)
r = (self.x[:, 0] * u.au).to(u.cm).value
out = (
2.0
* math.pi
* integrate.trapezoid(
quantity[r1idx : r2idx + 1] * r[r1idx : r2idx + 1], x=r[r1idx : r2idx + 1]
)
)
return out
[docs]
def avg_quantity(self, quantity, weight="gmass", weightArray=None, mask=None):
"""
Calculates the weighted mean value for the given field (e.g. td).
Different weights can be used (see `weight` parameter)
Parameters
----------
quantity : array_like(float,ndim=2)
the quantity to average. *DIMS:* (nx,nz).
weight : str
option for the weight. Values: `gmass` (gas mass) `dmass` (dust mass) or
`vol` (Volume). DEFAULT: `gmass`
weightArray : array_like(float,ndim=2)
Same dimension as `quantity`. If `weightArray` is not `None` it is used
as the weight for calculating the mean value.
mask : array_like(boolean,ndim=2)
A mask with the same dimension as `quantity`. All elements with `True` are
masked, i.e. not considered in the average (see numpy masked_array).
For example one can pass the result of :meth:`~prodimopy.read.getLineOriginMask`
to calculate average quantities over the emitting region of a line.
Examples
--------
Here is an example for making a mass weighted average for the gas temperature in the main
line emitting region for the CO 1.3 mm line. `model` is the model data read with
:func:`~prodimopy.read.read_prodimo`.
.. code-block:: python
avgtg=model.avg_quantity(model.tg,weight="gmass",
mask=model.getLineOriginMask(model.getLineEstimate("CO",1300.)))
"""
vol = np.ma.masked_array(self.vol, mask=mask)
quantitym = np.ma.masked_array(quantity, mask=mask)
if weightArray is not None:
wA = np.ma.masked_array(weightArray, mask=mask)
mean = np.sum(np.multiply(wA, quantitym))
return mean / np.sum(wA)
else:
if weight == "vol":
mean = np.sum(np.multiply(vol, quantitym))
return mean / np.sum(vol)
else:
if weight == "dmass":
rho = np.ma.masked_array(self.rhod, mask=mask)
else:
rho = np.ma.masked_array(self.rhog, mask=mask)
mass = np.multiply(vol, rho)
mean = np.sum(np.multiply(mass, quantitym))
mean = mean / np.sum(mass)
return mean
[docs]
def analyse_chemistry(
self,
species: str,
to_txt: bool = True,
filenameChemistry: str = "chemanalysis.out",
screenout: bool = False,
) -> Chemistry:
"""
Routine to create a Chemistry analysis object for the given species, if the chemanalysis data is available.
Parameters
----------
species :
The species to analyse.
to_txt :
Write info about formation and destruction reactions for the selected molecule to a txt file.
filenameChemistry :
The name of the file that holds the reaction rates. Just in case it has a different name, usually one does not need to change that.
screenout :
If ``True`` all the form./dest. reactions are printed on the screen.
Returns
-------
Chemistry :
Object that holds all the required information and can be use for the plotting routines or for :func:`~prodimopy.read.reac_rates_ix_iz`.
"""
import astropy.io.fits as fits
chemistry = Chemistry(self.name)
start = timer()
# stores all the formation reaction indices and rates for each grid point
gridf = np.empty((self.nx, self.nz, 2), dtype=np.ndarray)
gridd = np.empty((self.nx, self.nz, 2), dtype=np.ndarray)
totfrate = np.zeros((self.nx, self.nz)) # total formation rate at each point)
totdrate = np.zeros((self.nx, self.nz)) # total formation rate at each point)
fidx = list() # indices of all unique formation reactions
didx = list() # indices of all unique destruction reactions
# check if already a fits file with the given.out name exits
fnamefits = filenameChemistry.replace(".out", ".fits")
if self.td_fileIdx is not None: # take the time-dependent file index from the model
ext = _td_fileIdx_ext(self.td_fileIdx)
fnamefits = fnamefits.replace(".fits", ext + ".fits")
if os.path.isfile(os.path.join(self.directory, fnamefits)):
# if so, use it
print("INFO: Using existing fits file for chemanalysis: ", fnamefits)
filenameChemistry = fnamefits
else:
print("INFO: convert existing txt format file to fits (that can take a bit) ... ")
# create an array that holds the reaction rates
rates = np.zeros(
shape=(self.nx, self.nz, len(self.chemnet.reactions)), dtype=np.float32
)
if self.td_fileIdx is not None:
# if it is a time-dependent model, the filenameChemistry is different
# not nice, but ext should be set already
filenameChemistry = filenameChemistry.replace(".out", ext + ".out")
fc = open(os.path.join(self.directory, filenameChemistry), "r")
fc.readline() # skip the first line
for line in fc:
ix, iz, dummy, ireac, rate = line.split()
rates[int(ix) - 1, int(iz) - 1, int(ireac) - 1] = float(rate)
fc.close()
hdu = fits.PrimaryHDU(
data=rates.T
) # transpose to have it in the same order as it would be from prodimo
hdu.writeto(os.path.join(self.directory, fnamefits), overwrite=True)
filenameChemistry = fnamefits
# that gives me simply all formation and destruction reactions for the given species
# those include the real ids (e.g. starts at 1) and not the zero-based python indices!!
fidx = np.array([x.id for x in self.chemnet.reactions if species in x.products])
didx = np.array([x.id for x in self.chemnet.reactions if species in x.reactants])
# now read all info from the fits file
cfits = fits.open(
os.path.join(self.directory, filenameChemistry),
do_not_scale_image_data=True,
memmap=True,
)
formrates = cfits[0].data[fidx - 1, :, :] # is read in reversed order for the dims
destrates = cfits[0].data[didx - 1, :, :]
formrates = formrates.T # transpose it to have it in nx,nz,ncreac ...
destrates = destrates.T # transpose it to have it in nx,nz,ncreac ...
cfits.close()
# sorted indices for the formation and destruction rates
formridx = np.flip(np.argsort(formrates, axis=2), axis=2) # reversed order
destridx = np.flip(np.argsort(destrates, axis=2), axis=2) # reversed order
for ix in range(self.nx):
for iz in range(self.nz):
gridf[ix, iz, 0] = fidx[formridx[ix, iz, :]] # formation reaction indices, sorted
gridf[ix, iz, 1] = formrates[ix, iz, formridx[ix, iz, :]]
gridd[ix, iz, 0] = didx[
destridx[ix, iz, :]
] # destruction reaction indices, sorted
gridd[ix, iz, 1] = destrates[ix, iz, destridx[ix, iz, :]]
totfrate[:, :] = np.sum(formrates, axis=2) # total formation rate at each point
totdrate[:, :] = np.sum(destrates, axis=2) # total destruction rate at each point
chemistry.species = species
chemistry.gridf = gridf
chemistry.gridd = gridd
chemistry.fridxs = fidx
chemistry.dridxs = didx
chemistry.totfrate = totfrate
chemistry.totdrate = totdrate
chemistry.model = self # reference to the underlying model
if to_txt:
output_chem_fname = os.path.join(
self.directory, "chemistry_reactions_" + species + ".txt"
)
if self.td_fileIdx is not None:
output_chem_fname = output_chem_fname.replace(".txt", ext + ".txt")
f = open(output_chem_fname, "w")
f.writelines("-------------------------------------------------------\n")
f.writelines("formation and destruction reactions \n")
f.writelines("species: " + species + "\n\n")
f.writelines("Formation reactions\n")
for i, ridx in enumerate(chemistry.fridxs):
f.writelines(chemistry.reac_to_str(self.chemnet.reactions[ridx - 1], i + 1))
f.writelines("\n")
f.writelines("\n\n")
f.writelines("Destruction reactions\n")
for i, ridx in enumerate(chemistry.dridxs):
f.writelines(chemistry.reac_to_str(self.chemnet.reactions[ridx - 1], i + 1))
f.writelines("\n")
f.writelines("-------------------------------------------------------\n")
f.close()
print("INFO: Writing information to: " + output_chem_fname)
# also print it to stdout
if screenout:
with open(output_chem_fname) as f:
print(f.read())
print(
f"INFO: Found {len(chemistry.fridxs)} formation and {len(chemistry.dridxs)} destruction reactions for species {species}."
)
print("INFO: Calc time: ", "{:4.2f}".format(timer() - start) + " s")
print("")
return chemistry
[docs]
def freezeout_timescale(self, species, alpha=1.0):
"""
Calculates the freezeout timescale for the given species, on the full grid using
Eq. 12 from `Rab+ (2017) <https://ui.adsabs.harvard.edu/abs/2017A%26A...604A..15R/abstract>`_ .
Parameters
----------
species : string
The chemical species for which the freezeout timescale should be calculated.
alpha : float
The sticking coefficient. Default: 1.0
Returns
-------
array_like(float,ndim=2)
The freezeout timescale for the given species in years. *DIMS:* (nx,nz)
.. todo::
- get sticking coefficient from parameter file. However, that might not be possible
if an equation is used.
"""
mw = (self.species.mass[species] * u.g) / (1.0 * u.u).cgs
rhodp = self.p_rho_grain
return (
2.9e-12
* mw.value**0.5
* 1.0
/ alpha
* rhodp
/ self.d2g
* self.tg ** (-0.5)
* self.da2mean
* 1.0
/ self.rhog
)
[docs]
def read_prodimo(
directory=".",
name=None,
readlineEstimates=True,
readObs=True,
readImages=True,
filename="ProDiMo.out",
filenameLineEstimates="FlineEstimates.out",
filenameLineFlux="line_flux.out",
filenameFLiTs="specFLiTs.out",
td_fileIdx: str | int = None,
):
"""
Reads in all (not all yet) the output of a |prodimo| model from the given model directory.
Parameters
----------
directory : str
the directory of the model (optional). One can use `~` for the home directory
name : str
an optional name for the model (e.g. can be used in the plotting routines)
readlineEstimates : boolean
read the line estimates file (can be slow, so it is possible to deactivate it)
readObs : boolean
try to read observational data (e.g. SEDobs.dat ...)
filename : str
the filename of the main prodimo output
filenameLineEstimates : str
the filename of the line estimates output
filenameLineFlux : str
the filename of the line flux output
filenameFLiTs : str
the filename of the flits output
td_fileIdx :
in case of time-dependent model the index of a particular output age can be provided (e.g. "03")
if it is an int try to expand the index properly to a string (currently 1 becomes "0001")
Returns
-------
:class:`prodimopy.read.Data_ProDiMo`
the |prodimo| model data or `None` in case of errors.
"""
startAll = timer()
if directory.startswith("~"):
directory = os.path.expanduser(directory)
# guess a name if not set
if name is None:
if directory is None or directory == "." or directory == "":
dirfields = os.getcwd().split("/")
else:
dirfields = directory.split("/")
if dirfields[-1] == "": # this is the case for path/
name = dirfields[-2]
else:
name = dirfields[-1]
data = Data_ProDiMo(name)
# Reads StarSpecrum.out
data.star = read_star(directory)
if data.star is not None:
data.lambs=data.star._lambs # can be None, but if RTinterpolation.out exists we can get it
else:
data.lambs=None
# if td_fileIdx is given alter the filenames so that the time-dependent
# files can be read. However this would actually also work with other kind
# of indices as td_fileIdx can be a string
if td_fileIdx is not None:
rpstr = _td_fileIdx_ext(td_fileIdx)
filename = filename.replace(".out", rpstr + ".out")
filenameLineEstimates = filenameLineEstimates.replace(".out", rpstr + ".out")
filenameLineFlux = filenameLineFlux.replace(".out", rpstr + ".out")
filenameFLiTs = filenameFLiTs.replace(".out", rpstr + ".out")
f, dummy = _getfile(filename, directory, suppressMsg=True)
# need to stop, means no ProDiMo.out, message is printed in getfiles
if f is None:
raise FileNotFoundError("ERROR: Could not read "+os.path.join(directory,filename))
# read all data into the memory; easier to handle afterwards
lines = f.readlines()
f.close()
# start of the array data block
idata = 24
# store the file idx for later use. That should work, even if it is a string, if not we have bigger problems.
if td_fileIdx is not None:
data.td_fileIdx = int(td_fileIdx)
else:
data.td_fileIdx = None
data.directory = directory
# Read first the species, because only from there I can know if the electron
# is included as Species or not
# data is filled in the routine
data.species = read_species(directory)
if data.species is not None: # None should not happen
data.nspec = len(data.species.mass)
# This can happen if there is not StarSpectrum.out. But we want to read the ProDiMo.out, anyway.
if data.star is None:
data.star = DataStar(0, 0, 0, 0, 0)
data.star.mass = float(lines[0].split()[1])
data.star.Lstar = float(lines[1].split()[1])
data.p_mdisk = float(lines[5].split()[1])
data.p_dust_to_gas = float(lines[9].split()[1])
data.p_rho_grain = float(lines[10].split()[1])
data.p_v_turb = float(lines[18].split()[1])
strs = lines[21].split()
data.nx = int(strs[1])
data.nz = int(strs[2])
# is not set in read_species() FIXME: not nice
nspecreal = int(strs[3])
if data.nspec is None: # should not happen
print("WARN: Setting data.nspec, because there is no Species.out")
data.nspec = nspecreal + 1 # +1 for the electron
data.nheat = int(strs[4])
data.ncool = int(strs[5])
data.nlam = int(strs[6])
# move this to the constructor which takes nx,nz
data.x = np.zeros(shape=(data.nx, data.nz))
data.z = np.zeros(shape=(data.nx, data.nz))
data.lams = np.zeros(shape=(data.nlam))
data.AV = np.zeros(shape=(data.nx, data.nz))
data.AVrad = np.zeros(shape=(data.nx, data.nz))
data.AVver = np.zeros(shape=(data.nx, data.nz))
data.NHver = np.zeros(shape=(data.nx, data.nz))
data.NHrad = np.zeros(shape=(data.nx, data.nz))
data.d2g = np.zeros(shape=(data.nx, data.nz))
data.tg = np.zeros(shape=(data.nx, data.nz))
data.td = np.zeros(shape=(data.nx, data.nz))
data.nd = np.zeros(shape=(data.nx, data.nz))
data.soundspeed = np.zeros(shape=(data.nx, data.nz))
data.rhog = np.zeros(shape=(data.nx, data.nz))
data.pressure = np.zeros(shape=(data.nx, data.nz))
data.solved_chem = np.zeros(shape=(data.nx, data.nz), dtype=np.int32)
data.tauchem = np.zeros(shape=(data.nx, data.nz))
data.taucool = np.zeros(shape=(data.nx, data.nz))
data.taudiff = np.zeros(shape=(data.nx, data.nz))
data.heat_mainidx = np.zeros(shape=(data.nx, data.nz), dtype=np.int32)
data.cool_mainidx = np.zeros(shape=(data.nx, data.nz), dtype=np.int32)
data.nHtot = np.zeros(shape=(data.nx, data.nz))
data.damean = np.zeros(shape=(data.nx, data.nz))
data.Hx = np.zeros(shape=(data.nx, data.nz))
data.tauX1 = np.zeros(shape=(data.nx, data.nz))
data.tauX5 = np.zeros(shape=(data.nx, data.nz))
data.tauX10 = np.zeros(shape=(data.nx, data.nz))
data.zetaX = np.zeros(shape=(data.nx, data.nz))
data.chi = np.zeros(shape=(data.nx, data.nz))
data.chiRT = np.zeros(shape=(data.nx, data.nz))
data.kappaRoss = np.zeros(shape=(data.nx, data.nz))
data.zetaCR = np.zeros(shape=(data.nx, data.nz))
data.zetaSTCR = np.zeros(shape=(data.nx, data.nz))
data.da2mean = np.zeros(shape=(data.nx, data.nz))
data.dNlayers = np.zeros(shape=(data.nx, data.nz))
data.velocity = np.zeros(shape=(data.nx, data.nz, 3))
data.chi1_shielded = np.zeros(shape=(data.nx, data.nz))
data.chi2_shielded = np.zeros(shape=(data.nx, data.nz))
data.chi1_unshielded = np.zeros(shape=(data.nx, data.nz))
data.chi2_unshielded = np.zeros(shape=(data.nx, data.nz))
data.heat_names = list()
data.cool_names = list()
data.rateH2form = np.zeros(shape=(data.nx, data.nz))
data.rateH2diss = np.zeros(shape=(data.nx, data.nz, 3))
# init the caches, to avoid conversion to float if not necessary
# Item size (S13) is required, 13 should be enough
data._radFields_cache = np.empty((data.nx, data.nz, data.nlam), dtype="S13")
data._nmol_cache = np.empty((data.nx, data.nz, data.nspec), dtype="S13")
data._heat_cache = np.empty((data.nx, data.nz, data.nheat), dtype="S13")
data._cool_cache = np.empty((data.nx, data.nz, data.ncool), dtype="S13")
# Make some checks for the format
# new EXP format for x and z:
newexpformat = lines[idata].find("E", 0, 25) >= 0 # type: ignore
# number of fixed fields in ProDiMo.out (before heating and cooling rates)
nfixFields = 21
# index after the J data/fields in ProDiMo
iACool = nfixFields + data.nheat + data.ncool
iASpec = iACool + 1 + data.nspec
# this means the e- is also included as a species, it is then also included
# in the output as the last element, simply skip that column then.
# in prodimopy e- was anyway treated as a species.
iASpecskip = 0
if nspecreal == data.nspec:
iASpecskip = 1
iAJJ = iASpec + iASpecskip + data.nlam
# read the species names, these are taken from the column titles
colnames = lines[idata - 1]
# FIXME: Again hardcoding. Might be better to read the Species names
# from Species.out. Assumes that the name for each species has len 13.
# Assume that the first species is e- and search for that
# that is more flexible in case other names change
# specStart = 233 + data.nheat * 13 + data.ncool * 13 + 13
# do not include the electron in the splitting because that one is special
specStart = colnames.find(" e-") + 13
# Deal with e- as species (el_is_sp=true), in that
# case it is included twice in the output but using nspec from ProDiMo.out
# should be fine (i.e. ignore the last column of the species which is then e- again).
spnames = colnames[specStart : specStart + (data.nspec - 1) * 13].split()
# now insert the electron again.
# but both are dictionaries so I guess it should be fine
spnames.insert(0, "e-") # type: ignore
if len(spnames) != data.nspec:
print("ERROR: something is wrong with the number of Species!")
return None
# empty dictionary
data.spnames = OrderedDict()
# Make a dictionary for the spnames
for i in range(data.nspec):
data.spnames[str(spnames[i])] = i
# read the heating and cooling names
iheat = idata + data.nx * data.nz + 2
for i in range(data.nheat):
data.heat_names.append(lines[iheat + i][3:].strip())
icool = idata + data.nx * data.nz + 2 + data.nheat + 2
for i in range(data.ncool):
data.cool_names.append(lines[icool + i][3:].strip())
# read the band wavelenghts
iwls = idata + data.nx * data.nz + 2 + data.nheat + 2 + data.ncool + 2
for i in range(data.nlam):
data.lams[i] = float((lines[iwls + i].split())[1])
i = 0
# startLoop=timer()
with tqdm(total=data.nz * data.nx, desc=f"READ: {filename}: ") as pbar:
for iz in range(data.nz):
zidx = data.nz - iz - 1
for ix in range(data.nx):
pbar.update(1)
# stupid workaround for big disks/envelopes where x,y can be larger than 10000 au
# there is no space between the numbers for x and z so always add one if none is there
# this might can be removed in the future as the newest ProDiMo version use exp format now
line = lines[idata + i]
if not newexpformat:
if line[20] != " ":
line = line[:20] + " " + line[20:] # type: ignore
# needs another fix
if line[8] != " ":
line = line[:8] + " " + line[8:] # type: ignore
# This line is what eats the time, but using genfromtxt for the ProDiMo.out
# does not seem to be faster
# fields=np.fromstring(line,sep=" ")
fields = line.split()
data.x[ix, zidx] = fields[2]
data.z[ix, zidx] = fields[3]
data.NHrad[ix, zidx] = fields[4]
data.NHver[ix, zidx] = fields[5]
data.AVrad[ix, zidx] = fields[6]
data.AVver[ix, zidx] = fields[7]
data.nd[ix, zidx] = fields[8]
data.tg[ix, zidx] = fields[9]
data.td[ix, zidx] = fields[10]
data.soundspeed[ix, zidx] = fields[11]
data.rhog[ix, zidx] = fields[12]
data.pressure[ix, zidx] = fields[13]
data.solved_chem[ix, zidx] = fields[14]
data.chi[ix, zidx] = fields[15]
data.tauchem[ix, zidx] = fields[16]
data.taucool[ix, zidx] = fields[17]
data.taudiff[ix, zidx] = fields[18]
data.heat_mainidx[ix, zidx] = fields[19]
data.cool_mainidx[ix, zidx] = fields[20]
data._heat_cache[ix, zidx, :] = fields[nfixFields : nfixFields + data.nheat]
data._cool_cache[ix, zidx, :] = fields[
(nfixFields + data.nheat) : (nfixFields + data.nheat + data.ncool)
]
data.nHtot[ix, zidx] = fields[iACool]
data._nmol_cache[ix, zidx, :] = fields[iACool + 1 : iASpec]
data._radFields_cache[ix, zidx, :] = fields[(iASpec + iASpecskip) : iAJJ]
data.chiRT[ix, zidx] = fields[iAJJ]
data.kappaRoss[ix, zidx] = fields[iAJJ + 1]
data.damean[ix, zidx] = fields[iAJJ + 3]
data.d2g[ix, zidx] = fields[iAJJ + 4]
data.Hx[ix, zidx] = fields[iAJJ + 5]
data.tauX1[ix, zidx] = fields[iAJJ + 6]
data.tauX5[ix, zidx] = fields[iAJJ + 7]
data.tauX10[ix, zidx] = fields[iAJJ + 8]
data.zetaX[ix, zidx] = fields[iAJJ + 9]
data.rateH2form[ix, zidx] = fields[iAJJ + 10]
data.rateH2diss[ix, zidx, :] = fields[iAJJ + 11 : iAJJ + 11 + 3]
data.zetaCR[ix, zidx] = fields[iAJJ + 16]
# FIXME: maybe clean that up at some point, e.g. making the check for every line is not nice
# do most of it ouside of the loop and have only a simply if here
if len(fields) > (iAJJ + 17):
data.zetaSTCR[ix, zidx] = fields[iAJJ + 17]
if len(fields) > (iAJJ + 18):
data.da2mean[ix, zidx] = fields[iAJJ + 18]
# it seems Nlayers can sometimes become crazy numbers then e.g +100 is in the file without an e
data.dNlayers[ix, zidx] = _convToFloatCheck(fields[iAJJ + 19], ix, zidx, "dNlayers")
if len(fields) > (iAJJ + 20):
data.velocity[ix, zidx, :] = fields[iAJJ + 20 : iAJJ + 23]
if len(fields) > (iAJJ + 23):
data.chi1_unshielded[ix,zidx]=fields[iAJJ + 23]
data.chi2_unshielded[ix,zidx]=fields[iAJJ + 24]
data.chi1_shielded[ix,zidx]=fields[iAJJ + 25]
data.chi2_shielded[ix,zidx]=fields[iAJJ + 26]
else:
data.chi1_unshielded=None
data.chi2_unshielded=None
data.chi1_shielded=None
data.chi2_shielded=None
else:
data.velocity = None # for backwards compatibility, as it is a new field
else:
data.velocity = None # for backwards compatibility, as it is a new field
data.da2mean = None
data.dNlayers = None
data.chi1_unshielded=None
data.chi2_unshielded=None
data.chi1_shielded=None
data.chi2_shielded=None
i = i + 1
# endLoop=timer()
# derived quantitites
data.rhod = data.rhog * data.d2g
# AV like defined in the prodimo idl script
for ix in range(data.nx):
for iz in range(data.nz):
data.AV[ix, iz] = np.min(
[
data.AVver[ix, iz],
data.AVrad[ix, iz],
data.AVrad[data.nx - 1, iz] - data.AVrad[ix, iz],
]
)
# Read FlineEstimates.out
if readlineEstimates:
read_lineEstimates(directory, data, filename=filenameLineEstimates)
else:
data.lineEstimates = None
data.elements = read_elements(directory)
data.dust = read_dust(directory)
if data.dust is not None:
data.dust.fsize=read_dust_fsize(directory,data)
data.dust.fsize_rho=read_dust_fsize(directory,data,filename="dust_fsize_rho.out")
fileloc = directory + "/dust_opac_env.out"
if os.path.isfile(fileloc):
data.env_dust = read_dust(directory, filename="dust_opac_env.out")
data.dust_charge_mean,data.dust_charge_min,data.dust_charge_max = read_dust_charge(directory)
if os.path.exists(directory + "/gas_cs.out"):
data.gas = read_gas(directory)
if (data.lambs is None):
# quick hack to fill lambs, the grid is taken for gas_cs_band.out, if we don't have it already
fbandgas=os.path.join(directory ,"gas_cs_band.out")
if os.path.exists(fbandgas):
vals=np.loadtxt(fbandgas,comments="#",skiprows=2,usecols=(0,))
# just get the edges, last one needs to be added by hand
data.lambs=vals[0::3]
data.lambs=np.append(data.lambs,vals[-1])
if os.path.exists(directory + "/" + filenameLineFlux):
data.lines = read_linefluxes(directory, filename=filenameLineFlux)
if os.path.exists(directory + "/SED.out"):
data.sed = read_sed(directory)
if readObs:
data.sedObs = read_continuumObs(directory)
if data.lines is not None:
if os.path.exists(directory + "/LINEobs.dat"):
data.lineObs = read_lineObs(directory, len(data.lines))
# FIXME: workaround set also the frequency for the observed lline
if data.lineObs is not None:
for i in range(len(data.lines)):
data.lineObs[i].frequency = data.lines[i].frequency
if data.lineObs[i].profile is not None:
data.lineObs[i].profile.restfrequency = data.lineObs[i].frequency
if readImages:
data.contImages = read_continuumImages(directory)
# FIXME: not very nice, if one calls the routine directly the inclinations will not be set
if data.contImages is not None:
data.contImages._inclinations = data.sed._inclinations
if os.path.exists(directory + "/" + filenameFLiTs):
data.FLiTsSpec = read_FLiTs(directory, filenameFLiTs)
if os.path.exists(directory + "/Parameter.out"):
data.params = ModelParameters(filename="Parameter.out", directory=directory)
# Also set the age of the model if it is a time-dependent model
if data.td_fileIdx is not None:
data.age = data.params["AGE_DISK"][data.td_fileIdx - 1]
else:
data.age = np.inf # steady state model
# Try to read some special files
_read_12CO13CO_ratio(data)
# print("INFO: Loop time: ","{:4.2f}".format(endLoop-startLoop)+" s")
print("INFO: Reading time: ", "{:4.2f}".format(timer() - startAll) + " s")
# set muH
data.muH = data.rhog[0, 0] / data.nHtot[0, 0]
print(" ")
return data
def _convToFloatCheck(valstr: str, ix: int, iz: int, fieldname: str) -> float:
"""
Converts a string to a float but catches ValueErrors and deals with it.
Catches the case with wrong exponential format eg. 1.0089+100 that can happen
in |prodimo| output files. It should not happen, but this as a good workaround to
still be able to read the data.
Parameters:
-----------
valstr : str
the string to convert
ix : int
the x index (just for error message)
iz : int
the z index (just for error message)
fieldname : str
the name of the field (as e.g. in the ProDiMo.out)
"""
try:
val = float(valstr)
except ValueError as e:
if "+" in valstr:
val = 1.0e99 # assumes a large number
elif "-" in valstr:
val = 1.0e-99 # assumes a very small number
else:
val = np.nan
# Check for negative sign
if valstr.startswith("-"):
val = -val
print(f"WARN: Could not convert {fieldname} at ix={ix}, iz={iz}, (E: {e}) set it to {val}")
return val
def _td_fileIdx_ext(td_fileIdx: str | int) -> str:
"""
Returns the extension for the time-dependent file index.
This should be used to build the filename for the time-dependent output files.
Parameters
----------
td_fileIdx : str or int
the time-dependent file index, e.g. "03" or 3. If it is an `int` it will be formatted
to a 4 digit string with leading zeros.
Returns
-------
str : the extension for the time-dependent file index, e.g. "_0003.out".
"""
if td_fileIdx is not None:
if isinstance(td_fileIdx, int):
td_fileIdx = "{:04d}".format(td_fileIdx)
return "_" + td_fileIdx
else:
return ""
[docs]
def read_elements(directory, filename="Elements.out"):
"""
Reads the Elements.out file.
Also adds the electron "e-" to the Elements.
Parameters
----------
directory : str
the directory of the model
filename: str
an alternative Filename
Returns
-------
:class:`~prodimopy.read.DataElements`
the Elements model data or `None` in case of errors.
"""
f, dummy = _getfile(filename, directory)
if f is None:
return None
nelements = int(f.readline())
elements = DataElements()
f.readline()
for i in range(nelements):
line = f.readline()
fields = line.strip().split()
name = fields[0].strip()
# FIXME: Workaround with the quadrupole solver an element ch is included
# don't know what to do with that.
if name == "ch":
continue
elements.abun12[name] = float(fields[1])
elements.abun[name] = 10.0 ** (float(fields[1]) - 12.0)
elements.amu[name] = float(fields[2])
elements.massRatio[name] = float(fields[3])
# also read muH (last element of the string)
elements.muHamu = float(f.readline().strip().split()[-1])
return elements
[docs]
def read_species(directory, filename="Species.out") -> DataSpecies | None:
"""
Reads the Species.out file.
Also adds the electron "e-" if necessary.
Also sets data.nspec as only here I can know if e- was a real species in
|prodimo| or not. If it was a real species I don't need to add that.
Parameters
----------
directory : str
the directory of the model
pdata : :class:`prodimopy.read.Data_ProDiMo`
the |prodimo| Model data structure, where the data is stored
filename: str
an alternative Filename
Returns
-------
:class:`~prodimopy.read.DataSpecies`
the Species model data or `None` in case of errors.
..todo :
* stochiometry is still missing
"""
f, dummy = _getfile(filename, directory)
if f is None:
return None
species = DataSpecies()
# skip the first line
f.readline()
species.mass = OrderedDict()
species.charge = OrderedDict()
species.chemPot = OrderedDict()
for line in f:
fields = line.strip().split()
spname = fields[2].strip()
species.mass[spname] = (float(fields[3].strip()) * u.u).cgs.value
species.charge[spname] = int(fields[4].strip())
species.chemPot[spname] = float(fields[5].strip())
# Only do this if it does not exist yet.
# This is the only place where one can know if the e- is a real species
# in ProDiMo or not (well could also use Parameter.out)
if "e-" not in species.mass.keys():
species.mass["e-"] = const.m_e.cgs.value
species.charge["e-"] = -1
species.chemPot["e-"] = 0.0
else:
# overwrite the mass, as it is not correct due to rounding in Species.out
species.mass["e-"] = const.m_e.cgs.value
# electron should be in first place like in ProDiMo.out
species.mass.move_to_end("e-", last=False)
species.charge.move_to_end("e-", last=False)
species.chemPot.move_to_end("e-", last=False)
return species
[docs]
def read_lineEstimates(directory, pdata, filename="FlineEstimates.out"):
"""
Read FlineEstimates.out Can only be done after |prodimo|.out is read.
Parameters
----------
directory : str
the directory of the model
pdata : :class:`prodimopy.read.Data_ProDiMo`
the |prodimo| Model data structure, where the data is stored
filename: str
an alternative Filename
"""
f, rfile = _getfile(filename, directory, binary=True, suppressMsg=True)
if f is None:
pdata.lineEstimates = None
return None
else:
pdata._fpFlineEstimates = rfile
# check for version
line = f.readline().decode()
version = 1
if len(line) > 68:
# position of version
posver = line.find("version")
if posver >= 0:
version = int(line[posver + len("version") :].strip())
nlines = int(f.readline().strip())
f.readline()
pdata.lineEstimates = list()
pdata._versionFlineEstimates = version
nxrange = list(range(pdata.nx))
startBytes = 0
for i in tqdm(range(nlines), f"READ: {filename}"):
# has to be done in fixed format
# It could be that nline is not really equal the number of available lines
# this is might be a bug in ProDiMo but maybe intended (see
# OUTPUT_FLINE_ESTIMATE in ProDiMo). Add a check to be sure
line = f.readline()
if not line:
break
line = line.decode()
if version == 1:
le = DataLineEstimate(
(line[0:9]).strip(),
float(line[10:28]),
int(line[29:32]),
int(line[33:37]),
float(line[38:49]),
)
elif version == 2 or version == 3:
try:
le = DataLineEstimate(
(line[0:9]).strip(),
float(line[10:28]),
int(line[29:34]),
int(line[35:40]),
float(line[41:53]),
)
except ValueError as err:
print("Conversion problems: {0}".format(err))
print("Line (i,text): ", i, line)
print("Field: ", line[0:9], line[10:28], line[29:34], line[35:40], line[41:53])
raise err
else:
raise ValueError("Unknown version of FlineEstimates.out! version=" + str(version))
# Find out the number of bytes in one radial line to use seek in the
# next iterations
if i == 0:
start = f.tell()
le._posrInfo = start # store the position for reading this info if required
for j in nxrange:
f.readline()
startBytes = f.tell() - start
else:
le._posrInfo = f.tell()
f.seek(startBytes, 1)
# Add a reference, is needed for the lazy reading of the rInfo data
le._model=pdata
pdata.lineEstimates.append(le)
f.close()
def _read_12CO13CO_ratio(pdata: Data_ProDiMo, fname: str = "C13O_C12O_ratio.out"):
"""
Reads the file ``C13O_C12O_ratio.out``, if it exists, and fills
:attr:`~prodimo.read.Data_ProDiMo.isoratio_12CO13CO
"""
if not os.path.exists(os.path.join(pdata.directory, fname)):
return
f, dummy = _getfile(fname, pdata.directory, suppressMsg=False)
if f is None:
return None
ratio = np.loadtxt(f, skiprows=1, usecols=(2))
pdata.isoratio_12CO13CO = np.flip(np.reshape(ratio, (pdata.nx, pdata.nz), order="F"), axis=1)
f.close()
return
[docs]
def read_linefluxes(directory, filename="line_flux.out"):
"""Reads the line fluxes output. Can deal with multiple inclinations.
Parameters
----------
directory : str
the model directory.
filename : str
the filename of the output file (optional).
Returns
-------
list(:class:`prodimopy.read.DataLine`)
List of lines.
"""
f, dummy = _getfile(filename, directory)
if f is None:
return None
records = f.readlines()
f.close()
dist = float(records[0].split("=")[1]) # type: ignore
nlines = int(records[2].split("=")[1]) # type: ignore
nvelo = int(records[3].split("=")[1]) # type: ignore
lines = list()
# determine if there are multiple inclinations
# likely very slow
ninc = sum("inclination[degree]" in s for s in records) # type: ignore
# this might be faster, but also a bit dirtz
# nrecords=len(records)
# nlinesinc=4+nlines*(5+nvelo)+2
# print(nrecords,nlinesinc,(nrecords+2)/nlinesinc)
pos = 5
# loop for all inclinations
for iinc in range(ninc):
# print("start",iinc,pos,records[pos])
incl = float(records[pos - 4].split("=")[1]) # type: ignore
# print(incl)
# print()
# loop over all lines
for i in range(nlines):
# read the data for one line this the same data for inclinations
if iinc == 0:
line = DataLine()
rec = records[pos]
# try a new method that should work for all versions of line_flux output
# deals now also with eh _lte stuff (although hardcoded)
try:
fields = rec.split()
line.ident = fields[1].strip()
# extract the species
# TODO: nothing done for e.g. OI OII etc. could also be OI_LTE
if line.ident.endswith(("_lte", "_LTE")):
line.species = line.ident[:-4]
# Hitran molecules (check for isotopologues)
elif line.ident.endswith(("_H")):
line.species = line.ident[:-2]
else:
line.species = line.ident
line.prodimoInf = fields[2].strip() + fields[3].strip() # type: ignore
line.wl = float(fields[6].strip()[:-3]) # get rid of the mic unit
line.frequency = float(fields[9].strip()[:-3]) # get rid of the GHz
except: # noqa: E722
try: # FIXME: those could be removed, the general way should work for all of them
print("WARN: read_linefluxes: try older format")
line.species = (rec[10:20]).strip()
line.ident = line.species
line.prodimoInf = rec[21:41].strip()
line.wl = float(rec[48:64].strip()) # *u.um
line.frequency = float(rec[74:90].strip()) # *u.GHz
except: # noqa: E722
print("WARN: read_linefluxes: try even older format")
line.species = (rec[10:20]).strip()
line.ident = line.species
line.prodimoInf = rec[21:36].strip()
line.wl = float(rec[43:54].strip()) # *u.um
line.frequency = float(rec[63:76].strip()) # *u.GHz
line.distance = dist
else:
# get the line that was already initialised
line = lines[i]
# thisis data the changes for each inclination
rec = records[pos + 1]
line._fluxs.append(float(rec.split("=")[1].strip())) # type: ignore *u.Watt/u.m**2
rec = records[pos + 2]
line._fconts.append(float(rec.split("=")[1].strip())) # type: ignore *u.Jansky
# simply store inclination and distance for each line
line._inclinations.append(incl)
# one line in the profile is for the continuum
profile = DataLineProfile(nvelo - 1, restfrequency=line.frequency)
for j in range(nvelo - 1):
# skip the header line and the first line (continuum)?
fields = records[pos + 5 + j].split()
profile.velo[j] = float(fields[0]) # *u.km/u.s
profile.flux[j] = float(fields[1]) # *u.Jansky
if len(fields) > 2:
profile.flux_conv[j] = float(fields[2]) # *u.Jansky
if len(fields) > 3:
profile.flux_dust[j] = float(fields[3]) # *u.Jansky
line._profiles.append(profile)
# don't add the object again
if iinc == 0:
lines.append(line)
pos += nvelo + 5
pos += 6
return lines
[docs]
def read_lineObs(directory, nlines, filename="LINEobs.dat"):
"""
Reads the lineobs Data. the number of lines have to be known.
"""
f, dummy = _getfile(filename, directory)
if f is None:
return None
records = f.readlines()
f.close()
linesobs = list()
versionStr = records[0].split()
version = float(versionStr[0])
for rec in records[2 : 2 + nlines]:
fields = rec.split()
lineobs = DataLineObs(
float(fields[0].strip()),
float(fields[1].strip()),
float(fields[2].strip()),
float(fields[3].strip()),
fields[4].strip(),
)
# in case of upper limits flux might be zero in that case use sig.
if lineobs.flux < 1.0e-100:
lineobs.flux = lineobs.flux_err
linesobs.append(lineobs)
# the additional data
# check if there is actually more data
if len(records) > 2 + nlines + 1:
# FIXME: do this proberly (the reading etc. and different versions)
profile = (records[2 + nlines + 1].split())[0:nlines]
# autoC = (records[2 + nlines + 2].split())[0:nlines]
# vvalid = (records[2 + nlines + 3].split())[0:nlines]
# speres = (records[2 + nlines+4].split())[0:nlines]
offset = 5
if version > 2.0:
offset = 6
# now go through the profiles
for i in range(nlines):
proffilename = records[offset + nlines + i + 1].strip()
if profile[i] == "T":
if "nodata" == proffilename:
print("WARN: Something is wrong with line " + str(i))
linesobs[i].profile, linesobs[i].profileErr = read_lineObsProfile(
proffilename, directory=directory
)
return linesobs
[docs]
def read_lineObsProfile(filename, directory="."):
"""
reads a line profile file which can be used for |prodimo|
"""
f, dummy = _getfile(filename, directory)
if f is None:
return None
records = f.readlines()
f.close()
# First line number of velo points and central wavelength, which we do not
# need here (I think this is anyway optional
nvelo = int(records[0].split()[0].strip())
# skip header lines
profile = DataLineProfile(nvelo)
for i in range(2, nvelo + 2):
fields = records[i].split()
profile.velo[i - 2] = float(fields[0].strip())
profile.flux[i - 2] = float(fields[1].strip())
# also fill the convolved flux, which is just the same as the flux
# for observations
profile.flux_conv[i - 2] = profile.flux[i - 2]
# Check if there is actually an error field.
if (nvelo + 2) >= len(records):
err = 0.0
else:
if records[nvelo + 2].strip() != "":
err = float(records[nvelo + 2].split()[0].strip())
else:
err = 0.0
return profile, err
[docs]
def read_gas(directory, filename="gas_cs.out"):
"""
Reads gas_cs.out
Returns
-------
:class:`~prodimopy.read.DataGas`
"""
f, dummy = _getfile(filename, directory)
if f is None:
return None
nlam = int(f.readline().strip())
# skip one line
f.readline()
gas = DataGas(nlam)
for i in range(nlam):
fields = [float(field) for field in f.readline().split()]
gas.lam[i] = float(fields[0])
gas.abs_cs[i] = float(fields[1])
gas.sca_cs[i] = float(fields[2])
gas.energy[:] = ((gas.lam[:] * u.micron).to(u.eV, equivalencies=u.spectral())).value
f.close()
return gas
[docs]
def read_continuumImages(directory, filename="image.out"):
"""
Reads the image.out file.
To avoid unnecessary memory usage, the Intensities are not read in this
routine. For this use :func:`~prodimopy.read.DataContinuumImages.get_image`
"""
f, fname = _getfile(filename, directory=directory)
if f is None:
return None
incl = float(f.readline().split()[0])
nrnt = f.readline().split()
nlam = f.readline().split()[0]
# FIXME: passing incl ist not useful in case of multiple inclinations
images = DataContinuumImages(int(nlam), int(nrnt[0]), int(nrnt[1]), incl=incl, filepath=fname)
images.lams[:] = np.array(f.readline().split()).astype(float)
f.close()
# Read the coordinates, that is just for the first inclination
xy = np.loadtxt(fname, skiprows=6, usecols=(2, 3), max_rows=images.nx * images.ny)
images.x = xy[:, 0].reshape(images.nx, images.ny)
images.y = xy[:, 1].reshape(images.nx, images.ny)
return images
[docs]
def read_continuumObs(directory, filename="SEDobs.dat"):
"""
Read observational continuum data (SED).
Looks for and reads the following files:
- ``SEDobs.dat`` photometric data points.
- ``PREFIXspec.dat`` where prefix can be `Spitzer, ISO, PACS, SPIRE, JWST`
- ``extinct.dat`` extinction data used to redden the simulated SED.
All files are optional. The information is then used to e.g. overplot the observational data
in :func:`~prodimopy.plot.Plot.plot_sed`).
"""
contObs = None
rfile = directory + "/" + filename
if os.path.exists(rfile):
print("READ: Reading File: ", rfile, " ...")
f = open(rfile, "r")
nlam = int(f.readline().strip())
f.readline() # header line
contObs = DataContinuumObs(nlam=nlam)
for i in range(nlam):
elems = f.readline().split()
contObs.lam[i] = float(elems[0])
contObs.fnuJy[i] = float(elems[1])
contObs.fnuJyErr[i] = float(elems[2])
contObs.flag[i] = str(elems[3])
contObs.nu = (contObs.lam * u.micrometer).to(u.Hz, equivalencies=u.spectral()).value
contObs.fnuErg = (contObs.fnuJy * u.Jy).cgs.value
contObs.fnuErgErr = (contObs.fnuJyErr * u.Jy).cgs.value
f.close()
# check for the spectrum files
# types of spectra that are understood, is more of a unofficial naming convention
types = ["Spitzer", "ISO", "PACS", "SPIRE", "JWST"]
fnames = list()
for spectype in types:
fnames.extend(glob.glob(directory + "/" + spectype + "*spec*.dat"))
if fnames is not None and len(fnames) > 0:
if contObs is None:
contObs = DataContinuumObs()
contObs.specs = list()
for fname in fnames:
print("READ: Reading File: ", fname, " ...")
# use basename in the if to avoid problems with directory names
# containing the string of one of the types.
if types[0] in os.path.basename(fname):
spec = np.loadtxt(fname, skiprows=3)
elif types[1] in os.path.basename(fname):
spec = np.loadtxt(fname, skiprows=1)
elif types[2] in os.path.basename(fname):
spec = np.loadtxt(fname)
elif types[3] in os.path.basename(fname):
spec = np.loadtxt(fname, skiprows=1)
# convert frequency to micron, SPIRE data
spec[:, 0] = (spec[:, 0] * u.GHz).to(u.micron, equivalencies=u.spectral()).value
elif types[4] in os.path.basename(fname):
spec = np.loadtxt(fname, skiprows=3)
else:
print("Don't know about type of " + fname + " Spectrum. Try anyway")
spec = np.loadtxt(fname)
# If there is no error provided just add a zero column
if spec.shape[1] < 3:
spec = np.c_[spec, np.zeros(spec.shape[0])]
contObs.specs.append(spec)
# check if there is some extinction data available
fn_ext = directory + "/extinct.dat"
if os.path.exists(fn_ext):
print("READ: Reading File: ", fn_ext, " ...")
if contObs is None:
contObs = DataContinuumObs()
fext = open(fn_ext, "r")
fext.readline()
contObs.E_BV = float(fext.readline())
fext.readline()
contObs.R_V = float(fext.readline())
contObs.A_V = contObs.R_V * contObs.E_BV
fext.close()
# # check if there is an image.in
# fn_images= directory+"/image.in"
# if os.path.exists(fn_images):
# if contObs is None:
# contObs=DataContinuumObs()
#
# fext= open(fn_images,"r")
# fext.readline()
# fext.readline()
# nimages=int(fext.readline())
# positionAngle = float(fext.readline())
#
# for i in range(9):
# line = f.readline()
return contObs
[docs]
def read_FLiTs(directory, filename="specFLiTs.out"):
"""
Reads the FLiTs spectrum.
Returns
-------
:class:`prodimopy.read.DataFLiTsSpec`
The Spectrum.
"""
f, pfilename = _getfile(filename, directory)
if f is None:
return None, None
f.close()
# currently read only the wavelength and the total flux and the continuum (col 3)
# doesn't work for including the continuum
#
# likely slow, just to get the number of entries
f = open(pfilename)
lines = f.readlines()
nent = len(lines)
specFLiTs = DataFLiTsSpec()
specFLiTs.wl = np.zeros(shape=(nent))
specFLiTs.flux = np.zeros(shape=(nent))
specFLiTs.flux_cont = np.zeros(shape=(nent))
for i, line in enumerate(lines):
arr = line.split()
# sometimes only the first two columns a filled, deal with it
specFLiTs.wl[i] = float(arr[0])
specFLiTs.flux[i] = float(arr[1])
# fill the continuum just from the first column as it is sometime empty
# FIXME: not sure while sometimes FLiTs does not produce a continuum value, this is just a workaround
if len(arr) == 2:
specFLiTs.flux_cont[i] = float(arr[1])
else:
specFLiTs.flux_cont[i] = float(arr[3])
f.close()
return specFLiTs
[docs]
def read_sed(directory, filename="SED.out", filenameAna="SEDana.out"):
"""
Reads the |prodimo| SED output including the analysis data.
"""
f, dummy = _getfile(filename, directory)
if f is None:
return None
records = f.readlines()
f.close()
# determine if there are multiple inclinations
# likely very slow
# -1 becaus we ignore the analytic one
ninc = sum("inclination[degree]" in s for s in records) - 1
distance = float(records[0].split()[-1])
nlam = int(records[2])
sed = DataSED(nlam, distance)
# read the face-on SED, useful for generating escape probability spectra
for i in range(4,nlam+4):
sed.fnuFaceonErg[i-4]=float(records[i].split()[2])
ridx = nlam + 2 + 3
# need this already here
sed._Lbols = [None] * ninc
sed._Tbols = [None] * ninc
for iinc in range(ninc):
sed._inclinations.append(float(records[ridx].split()[-1]))
ridx += 3
fnuErg = np.zeros(shape=(nlam))
nuFnuW = np.zeros(shape=(nlam))
fnuJy = np.zeros(shape=(nlam))
for i in range(nlam):
elems = records[ridx].split()
if iinc == 0:
sed.lam[i] = float(elems[0])
sed.nu[i] = float(elems[1])
# FIXME: Workaround to catch strange values from ProDiMo .. should be fixed
# in ProDiMo
try:
fnuErg[i] = float(elems[2])
nuFnuW[i] = float(elems[3])
fnuJy[i] = float(elems[4])
except ValueError as err:
print("WARN: Could not read value from SED.out: ", err)
ridx += 1
sed._fnuErgs.append(fnuErg)
sed._nuFnuWs.append(nuFnuW)
sed._fnuJys.append(fnuJy)
sed._incidx = iinc
sed.setLbolTbol()
# next inc
ridx += 1
sed._incidx = 0 # reset to the first inclination
# The analysis data, if it is not there not a big problem
f, dummy = _getfile(filenameAna, directory)
if f is None:
sed.sedAna = None
return sed
nlam, nx = f.readline().split()
nlam = int(nlam)
nx = int(nx)
sed.sedAna = DataSEDAna(nlam, nx)
for i in range(nlam):
elems = f.readline().split()
sed.sedAna.lams[i] = float(elems[0])
sed.sedAna.r15[i] = float(elems[1])
sed.sedAna.r85[i] = float(elems[2])
for j in range(nx):
elems = f.readline().split()
sed.sedAna.z15[i, j] = float(elems[0])
sed.sedAna.z85[i, j] = float(elems[1])
f.close()
return sed
[docs]
def read_star(directory: str, filenameSpec: str ="StarSpectrum.out", filenameRTinterpol: str ="RTinterpolation.out"):
"""
Reads `StarSpectrum.out` and `RTinterpolation.out` if it exists.
Calls :func:`~prodimopy.read.read_starSpec` for the `StarSpectrum.out`
Parameters
----------
directory :
The model directory.
filenameSpec :
The filename for the star spectrum. Default: "StarSpectrum.out"
filenameRTinterpol :
The filename for the RT interpolation data. Default: "RTinterpolation.out"
"""
# Also creates the star object
# FIXME: is not so nice, but comes from old implementation for starSpec
star=read_starSpec(directory,filename=filenameSpec)
# check to avoid confusing warning message, it is not a required file.
if not os.path.exists(os.path.join(directory, filenameRTinterpol)):#
return star
f, dummy = _getfile(filenameRTinterpol, directory)
if f is None:
return star
lines= f.readlines()
nbands=int(lines[1])
star._lambs=np.array([float(line.split()[0]) for line in lines[2:2+nbands+1]])
star.InuBand=np.array([float(line.split()[2]) for line in lines[2+nbands+1:2+2*nbands+1]])
# fill interpolated data,can be different to Starlam actually
if (len(lines[2+2*nbands+1+1].split()) < 4): # older versions of RTinterpol might have truncated lines check for that.
# just use every second line, as long as the line has still the first 3 columns we are fine
star.wlInterpol=np.array([float(line.split()[0]) for line in lines[2+2*nbands+1+1::2]])
star.InuInterpol=np.array([float(line.split()[2]) for line in lines[2+2*nbands+1+1::2]])
else:
star.wlInterpol=np.array([float(line.split()[0]) for line in lines[2+2*nbands+1+1:]])
star.InuInterpol=np.array([float(line.split()[2]) for line in lines[2+2*nbands+1+1:]])
f.close()
return star
[docs]
def read_starSpec(directory: str, filename: str ="StarSpectrum.out"):
"""
Reads StarSpectrum.out
Parameters
----------
directory :
The model directory.
filename :
The filename. Default: "StarSpectrum.out"
"""
f, dummy = _getfile(filename, directory)
if f is None:
return None
teff = float((f.readline().split())[-1])
elems = f.readline().split()
r = float(elems[-1])
logg = float(elems[2])
luv = float(f.readline().split()[-1])
nlam = int(f.readline())
f.readline()
starSpec = DataStar(nlam, teff, r, logg, luv)
for i in range(nlam):
elems = f.readline().split()
starSpec.lam[i] = float(elems[0])
starSpec.nu[i] = float(elems[1])
starSpec.Inu[i] = float(elems[2])
f.close()
return starSpec
[docs]
def read_bgSpec(directory, filename="BgSpectrum.out"):
"""
Reads the BgSpectrum.out file.
Returns
-------
:class:`prodimopy.read.DataBgSpec`
the background spectra or `None` if not found.
"""
f, dummy = _getfile(filename, directory)
if f is None:
return None
nlam = int(f.readline())
f.readline()
bgSpec = DataBgSpec(nlam)
for i in range(nlam):
elems = f.readline().split()
bgSpec.lam[i] = float(elems[0])
bgSpec.nu[i] = float(elems[1])
bgSpec.Inu[i] = float(elems[2])
return bgSpec
[docs]
def read_dust(directory, filename="dust_opac.out"):
"""
Reads `dust_opac.out` and if it exists `dust_sigmaa.out`.
Does not read the dust composition yet.
Returns
-------
:class:`~prodimopy.read.DataDust`
"""
f, dummy = _getfile(filename, directory)
if f is None:
return None
fields = [int(field) for field in f.readline().split()]
ndsp = fields[0]
nlam = fields[1]
# skip three lines
for i in range(ndsp):
f.readline()
# apow amax etc.
strings = f.readline().split()
if len(strings) > 0:
amin = ((float(strings[6]) * u.cm).to(u.micron)).value
amax = ((float(strings[7]) * u.cm).to(u.micron)).value
apow = float(strings[8])
nsize = int(strings[9])
dust = DataDust(amin, amax, apow, nsize, nlam)
else:
dust = DataDust(-1, -1, 0.0, -1, nlam)
f.readline()
for i in range(nlam):
fields = [float(field) for field in f.readline().split()]
dust.lam[i] = float(fields[0])
dust.kext[i] = float(fields[1])
dust.kabs[i] = float(fields[2])
dust.ksca[i] = float(fields[3])
if len(fields) > 4:
dust.ksca_an[i] = float(fields[5]) # skip kprn
f.close()
dust.energy[:] = ((dust.lam[:] * u.micron).to(u.eV, equivalencies=u.spectral())).value
# is optional but returns None if file is not there
# does not have to be there
dust.asize, dust.sigmaa = read_dust_sigmaa(directory)
return dust
[docs]
def read_dust_sigmaa(directory, filename="dust_sigmaa.out") -> tuple[NDArray, NDArray]:
"""
Reads the radial surface density profiles for each grain size.
Parameters
----------
directory : str
The directory where the file is located (the |prodimo| model directory).
filename : str
The filename. Default: "dust_sigmaa.out"
"""
if not os.path.isfile(os.path.join(directory, filename)):
return None, None
f, pfilename = _getfile(filename, directory)
if f is None:
return None, None
f.close() # don't need it use np for simplicity
# a bit quick an dirty
data = np.loadtxt(pfilename, comments="#")
# the first row in the array are the grain sizes.
# convert from cm to micron
asize = (data[0, :] * u.cm).to(u.micron).value
# used transposed array, to be consistent with ohter stuff.
sigmaa = np.transpose(data[1:, :])
return asize, sigmaa
[docs]
def read_dust_fsize(directory: str, pdata: Data_ProDiMo, filename: str ="dust_fsize.out") -> NDArray[np.float64]:
'''
Reads the dust size distribution function if available (either dust_fisize.out or dust_fsize_rho.out).
Parameters
----------
directory :
The directory where the file is located (the |prodimo| model directory).
pdata :
The model object.
filename :
The filename. Default: "dust_fsize.out"
Returns
-------
NDArray
Dust size distribution funciton DIM: (na,nx,nz) na is the number of grain sizes.
'''
# is not required so check if it is there
if not os.path.isfile(os.path.join(directory,filename)):
return None
f, dummy = _getfile(filename, directory)
if f is None:
return None
lines=f.readlines()
istart=0
for line in lines:
if not line.strip().startswith("#"):
break
else:
istart+=1
na=pdata.dust.nsize
fsize=np.zeros(shape=(na,pdata.nx,pdata.nz))
for ia in range(na):
for ix in range(pdata.nx):
iline=istart+ia*pdata.nx+ix
vals=np.array([float(x) for x in lines[iline].split()])
fsize[ia,ix,:]=vals[:]
return fsize
[docs]
def read_dust_charge(directory: str, filename: str ="dust_charge.out") -> tuple[NDArray, NDArray,NDArray]:
'''
Reads some dust charge information, if dust charge chemistry was used.
Parameters
----------
directory :
The directory where the file is located (the |prodimo| model directory).
filename :
The filename. Default: "dust_charge.out"
Returns
-------
tuple of NDArray
mean dustcharc, minimum dust charge and maximum dustcharge
'''
# is not required so check if it is there
if not os.path.isfile(os.path.join(directory,filename)):
return None,None,None
f, dummy = _getfile(filename, directory)
if f is None:
return None,None,None
line=f.readline()
nx,nz,nsize=line.split()
nx=int(nx)
nz=int(nz)
nsize=(nsize)
mean=np.zeros(shape=(nx,nz))
min=np.zeros(shape=(nx,nz))
max=np.zeros(shape=(nx,nz))
for arr in [mean,min,max]:
for iz in range(nz-1,-1,-1):
for ix in range(nx):
arr[ix,iz]=_convToFloatCheck(f.readline().split()[-1],ix,iz,"dust charge")
return mean,min,max
[docs]
def calc_NHrad_oi(data):
"""
Calculates the radial column density from out to in (border of model space to star/center)
.. todo::
- is this really needed, should rather be a method within :class:`prodimopy.read.Data_ProDiMo`
"""
NHradoi = 0.0 * data.nHtot
for ix in range(data.nx - 2, 1, -1): # first ix point (ix=0= remains zero
r1 = (data.x[ix + 1, :] ** 2 + data.z[ix + 1, :] ** 2) ** 0.5
r2 = (data.x[ix, :] ** 2 + data.z[ix, :] ** 2) ** 0.5
dr = r1 - r2
dr = dr * u.au.to(u.cm)
nn = 0.5 * (data.nHtot[ix + 1, :] + data.nHtot[ix, :])
NHradoi[ix, :] = NHradoi[ix + 1, :] + nn * dr
return NHradoi
[docs]
def calc_surfd(data):
"""
Caluclates the gas and dust vertical surface densities at every point in the
model.
Only one half of the disk is considered. If one needs the total surface density
simply multiply the value from the midplane (zidx=0) by two.
"""
print("INFO: Calculate surface densities")
tocm = (1.0 * u.au).to(u.cm).value
data._sdg = 0.0 * data.rhog
data._sdd = 0.0 * data.rhod
for ix in range(data.nx):
for iz in range(data.nz - 2, -1, -1): # from top to bottom
dz = data.z[ix, iz + 1] - data.z[ix, iz]
dz = dz * tocm
nn = 0.5 * (data.rhog[ix, iz + 1] + data.rhog[ix, iz])
data._sdg[ix, iz] = (
data._sdg[
ix,
iz + 1,
]
+ nn * dz
)
nn = 0.5 * (data.rhod[ix, iz + 1] + data.rhod[ix, iz])
data._sdd[ix, iz] = (
data._sdd[
ix,
iz + 1,
]
+ nn * dz
)
def _calc_vol(data):
"""
Inits the vol field (:attr:`~prodimpy.read.Data_ProDiMo._vol` for each individual grid point.
This is useful to estimate mean quantities which are weighted by volume
but also by mass.
The routine follows the same method as in the prodimo.pro (the IDL skript)
"""
print("INFO: Calculate volumes")
tocm = (1.0 * u.au).to(u.cm).value
data._vol = np.zeros(shape=(data.nx, data.nz))
fourthreepi = 4.0 * math.pi / 3.0
for ix in range(data.nx):
ix1 = np.max([0, ix - 1])
ix2 = ix
ix3 = np.min([data.nx - 1, ix + 1])
x1 = math.sqrt(data.x[ix1, 0] * data.x[ix2, 0]) * tocm
x2 = math.sqrt(data.x[ix2, 0] * data.x[ix3, 0]) * tocm # does not depend on iz
for iz in range(data.nz):
iz1 = np.max([0, iz - 1])
iz2 = iz
iz3 = np.min([data.nz - 1, iz + 1])
tanbeta1 = 0.5 * (data.z[ix, iz1] + data.z[ix, iz2]) / data.x[ix, 0]
tanbeta2 = 0.5 * (data.z[ix, iz2] + data.z[ix, iz3]) / data.x[ix, 0]
data.vol[ix, iz] = fourthreepi * (x2**3 - x1**3) * (tanbeta2 - tanbeta1)
# Test for volume stuff ... total integrated mass
# mass=np.sum(np.multiply(data.vol,data.rhog))
# print("Total gas mass",(mass*u.g).to(u.M_sun))
def _calc_cdnmol(data):
"""
Calculated the vertical column number densities for every species
at every point in the disk (from top to bottom). Very simple and rough method.
Only one half of the disk is considered. If one needs the total surface density
simply multiply the value from the midplane (zidx=0) by two.
"""
if data._log:
print("INFO: Calculate vertical column densities")
tocm = data._pAUtocm
data._cdnmol = 0.0 * data.nmol
for ix in range(data.nx):
for iz in range(data.nz - 2, -1, -1): # from top to bottom
dz = data.z[ix, iz + 1] - data.z[ix, iz]
dz = dz * tocm
nn = 0.5 * (data.nmol[ix, iz + 1, :] + data.nmol[ix, iz, :])
data._cdnmol[ix, iz, :] = data._cdnmol[ix, iz + 1, :] + nn * dz
def _calc_rcdnmol(data):
"""
Calculated the radial column number densities for every species
at every point in the disk. Very simple and rough method.
FIXME: this can be quite inaccurate as the x info for the innermost points is not printed accurately to PRoDiMo.out which cause dr to be zero
"""
if data._log:
print("INFO: Calculate radial column densities")
tocm = (1.0 * u.au).to(u.cm).value
data._rcdnmol = 0.0 * data.nmol
for iz in range(data.nz):
for ix in range(1, data.nx, 1): # first ix point (ix=0= remains zero
r1 = (data.x[ix, iz] ** 2 + data.z[ix, iz] ** 2) ** 0.5
r2 = (data.x[ix - 1, iz] ** 2 + data.z[ix - 1, iz] ** 2) ** 0.5
dr = r1 - r2
dr = dr * tocm
nn = 0.5 * (data.nmol[ix, iz, :] + data.nmol[ix - 1, iz, :])
data._rcdnmol[ix, iz, :] = data._rcdnmol[ix - 1, iz, :] + nn * dr
# FIXME: test the integration error can be at most 16% ... good enough for now (most fields are better)
# nHverC=data._rcdnmol[:,:,data.spnames["H"]]+data._rcdnmol[:,:,data.spnames["H+"]]+data._rcdnmol[:,:,data.spnames["H2"]]*2.0
# izt=data.nz-2
# print(nHverC[:,izt],data.NHrad[:,izt])
# print(np.max(np.abs(1.0-nHverC[1:,:]/data.NHrad[1:,:])))
def _flux_Wm2toJykms(flux, frequency):
"""
Converts a flux from W m^-2 to Jy km/s
Parameters
----------
flux: float
the flux in units of [W m^-2]
frequency: float
the frequency of the flux in [GHz]
Returns
-------
float
The flux of the line. `UNIT: Jansky km s^-1`
"""
res = flux * u.Watt / (u.m**2.0)
ckm = const.c.to("km/s")
res = (res).to(u.Jansky, equivalencies=u.spectral_density(frequency * u.GHz))
return (res * ckm).value
def _getfile(filename: str, directory: str | None = None, binary: bool = False, suppressMsg: bool = False):
"""
Utility function to open a particular file from a |prodimo| model.
Parameters
----------
filename :
The name of the file to open.
directory :
The directory where the file is located. Default is `None`.
binary :
If `True`, opens the file in binary mode. Default is `False`.
suppressMsg :
If `True`, suppresses the READ file message. Default is `False`.
.. todo::
- Add parameter `required` to raise throw an exception if a necessary file is not found.
- check if that still works with incomplete models (e.g. filled data structure by hand)
"""
pfilename = filename
if directory is not None:
pfilename = os.path.join(directory, filename)
try:
if binary:
f = open(pfilename, "rb")
else:
f = open(pfilename, "r")
except: # noqa: E722
f = None
if f is None:
print(("WARN: Could not open " + pfilename + "!"))
else:
if not suppressMsg:
print(f"READ: {filename} ...")
return f, pfilename
[docs]
@deprecated("Use the function prodimopy.read.Data_ProDiMo.analyse_chemistry instead.")
def analyse_chemistry(
species: str,
model: Data_ProDiMo,
to_txt=True,
td_fileIdx: str | int = None,
filenameChemistry="chemanalysis.out",
screenout=True,
) -> Chemistry:
"""
.. deprecated:: 2.5
Use :func:`prodimopy.read.Data_ProDiMo.analyse_chemistry` instead.
Function that analyses the chemistry in a similar way to chemanalysis.pro.
Parameters
----------
species : str
The species name one wants to analyze.
model : :class:`~prodimopy.read.Data_ProDiMo`
the |prodimo| model data.
to_txt : boolean
Write info about formation and destruction reactions for the selecte molecule
to a txt file. Default: `True`
td_fileIdx : str or int
For time-dependent chemanalysis. Provide here the idx for the timestep.
E.g. `"0001"` for the first one.
filenameChemistry : str
The name of the file that holds the reaction rates. Default: `chemanalysis.out`.
Just in case it has a different name, usually one does not need to change that.
screenout : boolean
If `True` all the form./dest. reactions are printed on the screen. Default: `True`
Returns
-------
:class:`prodimopy.read.Chemistry`
Object that holds all the required information and can be use for the plotting routines,
and to analyse the chemistry point by point `func:~pread.Chemistry.reac_rates_ix_iz`.
"""
import astropy.io.fits as fits
chemistry = Chemistry(model.name)
start = timer()
# stores all the formation reaction indices and rates for each grid point
gridf = np.empty((model.nx, model.nz, 2), dtype=np.ndarray)
gridd = np.empty((model.nx, model.nz, 2), dtype=np.ndarray)
totfrate = np.zeros((model.nx, model.nz)) # total formation rate at each point)
totdrate = np.zeros((model.nx, model.nz)) # total formation rate at each point)
fidx = list() # indices of all unique formation reactions
didx = list() # indices of all unique destruction reactions
# check if already a fits file with the given.out name exits
fnamefits = filenameChemistry.replace(".out", ".fits")
if td_fileIdx is not None:
ext = _td_fileIdx_ext(td_fileIdx)
fnamefits = fnamefits.replace(".fits", ext + ".fits")
if os.path.isfile(os.path.join(model.directory, fnamefits)):
# if so, use it
print("INFO: Using existing fits file for chemanalysis: ", fnamefits)
filenameChemistry = fnamefits
else:
print("INFO: convert existing txt format file to fits (that can take a bit) ... ")
# create an array that holds the reaction rates
rates = np.zeros(
shape=(model.nx, model.nz, len(model.chemnet.reactions)), dtype=np.float32
)
if td_fileIdx is not None:
# if it is a time-dependent model, the filenameChemistry is different
# not nice, but ext should be set already
filenameChemistry = filenameChemistry.replace(".out", ext + ".out")
fc = open(os.path.join(model.directory, filenameChemistry), "r")
fc.readline() # skip the first line
for line in fc:
ix, iz, dummy, ireac, rate = line.split()
rates[int(ix) - 1, int(iz) - 1, int(ireac) - 1] = float(rate)
fc.close()
hdu = fits.PrimaryHDU(
data=rates.T
) # transpose to have it in the same order as it would be from prodimo
hdu.writeto(os.path.join(model.directory, fnamefits), overwrite=True)
filenameChemistry = fnamefits
# that gives me simply all formation and desctruction reactions for the given species
# those include the real ids (e.g. starts at 1) and not the zero-based python indices!!
fidx = np.array([x.id for x in model.chemnet.reactions if species in x.products])
didx = np.array([x.id for x in model.chemnet.reactions if species in x.reactants])
# now read all info from the fits file
cfits = fits.open(
os.path.join(model.directory, filenameChemistry), do_not_scale_image_data=True, memmap=True
)
formrates = cfits[0].data[fidx - 1, :, :] # is read in reversed order for the dims
destrates = cfits[0].data[didx - 1, :, :]
formrates = formrates.T # transpose it to have it in nx,nz,ncreac ...
destrates = destrates.T # transpose it to have it in nx,nz,ncreac ...
cfits.close()
# sorted indices for the formation and destruction rates
formridx = np.flip(np.argsort(formrates, axis=2), axis=2) # reversed order
destridx = np.flip(np.argsort(destrates, axis=2), axis=2) # reversed order
for ix in range(model.nx):
for iz in range(model.nz):
gridf[ix, iz, 0] = fidx[formridx[ix, iz, :]] # formation reaction indices, sorted
gridf[ix, iz, 1] = formrates[ix, iz, formridx[ix, iz, :]]
gridd[ix, iz, 0] = didx[destridx[ix, iz, :]] # destruction reaction indices, sorted
gridd[ix, iz, 1] = destrates[ix, iz, destridx[ix, iz, :]]
totfrate[:, :] = np.sum(formrates, axis=2) # total formation rate at each point
totdrate[:, :] = np.sum(destrates, axis=2) # total destruction rate at each point
chemistry.species = species
chemistry.gridf = gridf
chemistry.gridd = gridd
chemistry.fridxs = fidx
chemistry.dridxs = didx
chemistry.totfrate = totfrate
chemistry.totdrate = totdrate
# reference to the model
chemistry.model = model
if to_txt:
output_chem_fname = os.path.join(
model.directory, "chemistry_reactions_" + species + ".txt"
)
if td_fileIdx is not None:
output_chem_fname = output_chem_fname.replace(".txt", ext + ".txt")
f = open(output_chem_fname, "w")
f.writelines("-------------------------------------------------------\n")
f.writelines("formation and destruction reactions \n")
f.writelines("species: " + species + "\n\n")
f.writelines("Formation reactions\n")
for i, ridx in enumerate(chemistry.fridxs):
f.writelines(chemistry.reac_to_str(model.chemnet.reactions[ridx - 1], i + 1))
f.writelines("\n")
f.writelines("\n\n")
f.writelines("Destruction reactions\n")
for i, ridx in enumerate(chemistry.dridxs):
f.writelines(chemistry.reac_to_str(model.chemnet.reactions[ridx - 1], i + 1))
f.writelines("\n")
f.writelines("-------------------------------------------------------\n")
f.close()
print("Writing information to: " + output_chem_fname)
# also print it to stdout
if screenout:
with open(output_chem_fname) as f:
print(f.read())
print("INFO: Calc time: ", "{:4.2f}".format(timer() - start) + " s")
return chemistry
[docs]
@deprecated("Use the function prodimopy.read.Chemistry.reac_rates_ix_iz instead.")
def reac_rates_ix_iz(model, chemana, ix=None, iz=None, locau=None, lowRatesFrac=1.0e-3):
"""
.. deprecated:: 2.5
Use :func:`prodimopy.read.Chemistry.reac_rates_ix_iz` instead.
Function that analyses the chemana manually via a point-by-point
analysis for a given species. Shows the most important formation and destruction rates for the given point ix,iz (or in au via Parameter `locau`).
Parameters
----------
model : :class:`~prodimopy.read.Data_ProDiMo`
the model data
chemana : :class:`~prodimopy.read.Chemistry`
data resulting from :func:`~prodimopy.read.analyse_chemistry` on a single species
ix : int
ix corresponding to desired radial location in grid, starting at 0
iz : int
iz corresponding to desired radial location in grid, starting at 0
locau : array_like
the desired coordinates in au [x,z]. The routine then finds the closest
grid point for those coordinates.
lowRatesFrac : float
Only rates with rate/total_rate > lowRatesFrac are printed. Default: 1.e-3
This is useful to avoid printing all the reactions that are not important.
"""
if locau is not None:
ix = np.argmin(np.abs(model.x[:, 0] - locau[0]))
iz = np.argmin(np.abs(model.z[ix, :] - locau[1]))
if ix is None or iz is None:
print("Please provide either ix,iz or locau!")
return
print("Analysing point x=", str(model.x[ix, iz]) + " au z=" + str(model.z[ix, iz]) + " au")
print(" Detailed reaction rates for: %10s" % chemana.species)
print(
"------------------------------------------------------------------------------------------------------"
)
print(" grid point = %i %i" % (ix, iz))
print(" r,z [au] (cylindrical) = %.3f %.4f" % (model.x[ix, iz], model.z[ix, iz]))
print(" n<H>,nd [cm^-3] = %.1e %.1e" % (model.nHtot[ix, iz], model.nd[ix, iz]))
print(" Tgas,Tdust [K] = %.1e %.1e" % (model.tg[ix, iz], model.td[ix, iz]))
print(
" AV_rad,AV_ver = %.1e %.1e" % (model.AVrad[ix, iz], model.AVver[ix, iz])
)
print(
" %10s" % chemana.species
+ " abundance = %e" % model.getAbun(chemana.species)[ix, iz]
)
print(
"------------------------------------------------------------------------------------------------------"
)
print(" Total form. rate [cm^-3 s^-1] = {:10.2e}".format(chemana.totfrate[ix, iz]))
for i, ridx in enumerate(chemana.gridf[ix, iz, 0]):
rate = chemana.gridf[ix, iz, 1][i]
if rate / chemana.totfrate[ix, iz] > lowRatesFrac: # don't print low rates
print(
chemana.reac_to_str(
model.chemnet.reactions[ridx - 1],
list(chemana.fridxs).index(ridx) + 1,
rate=rate,
)
)
print(
"------------------------------------------------------------------------------------------------------"
)
print(" Total dest. rate [cm^-3 s^-1] = {:10.2e}".format(chemana.totdrate[ix, iz]))
for i, ridx in enumerate(chemana.gridd[ix, iz, 0]):
rate = chemana.gridd[ix, iz, 1][i]
if rate / chemana.totdrate[ix, iz] > lowRatesFrac: # don't print unimportant rates
print(
chemana.reac_to_str(
model.chemnet.reactions[ridx - 1],
list(chemana.dridxs).index(ridx) + 1,
rate=rate,
)
)
print(
"------------------------------------------------------------------------------------------------------"
)
print("")