"""
.. module:: run_slab
:synopsis: Python version of slab models
.. moduleauthor:: A. M. Arabhavi
"""
import numpy as np
import pandas as pd
import prodimopy.hitran as ht
import prodimopy.read_slab as rs
from astropy.io import fits
from astropy.convolution import Gaussian1DKernel
from astropy.convolution import convolve as apy_convolve
from scipy.interpolate import RegularGridInterpolator
import hapi as hp
from scipy.interpolate import interp1d
import prodimopy.read as pread
from tqdm.auto import tqdm
import os
from concurrent.futures import ProcessPoolExecutor
from functools import partial
from scipy.constants import c,h,G
from scipy.constants import k as kb
from scipy.constants import astronomical_unit as au
from scipy.constants import parsec as pc
from scipy.constants import atomic_mass as amu
from astropy.constants import M_sun
from astropy import units as u
percm2_to_perm2 = 1e4
gpercm3_to_kgperm3 = 1e3
cm2perg_to_m2perkg = 1e-1
percm3_to_perm3 = 1e6
cm_to_m = 1e-2
um_to_m = 1e-6
kmps = rs.kmps
um = ((1*u.micron).to(u.m)).value
cm_K = c*h/kb*1e2
jy = (1*u.Jy).to(u.kg/u.s**2).value
mu = 2.3
M_sun = M_sun.si.value
[docs]
def read_partition_sums(custom_partition_sum_file):
try:
custom_partition_sums = np.loadtxt(custom_partition_sum_file)
except:
try:
custom_partition_sums = np.loadtxt(custom_partition_sum_file,delimiter=',')
except:
raise ValueError(f'Cannot read {custom_partition_sum_file}')
return(interp1d(custom_partition_sums[:,0],custom_partition_sums[:,1]))
QT_mol_id = {name: mol_id for mol_id, name in set([(i[1][0],i[1][-1]) for i in list(hp.ISO_ID.items())])}
[docs]
def parse_paired_rules(selection, Nspecies, arg_name="selection", expected_types=(int, float)):
"""
Normalizes flexible paired inputs (ranges or string pairs) into a strict format:
[ [ [val1, val2], ... ], [ [val1, val2], ... ], ... ] of length Nspecies.
"""
if selection is None:
return [None] * Nspecies
if isinstance(selection, np.ndarray):
selection = selection.tolist()
if not isinstance(selection, list):
raise ValueError(f"{arg_name} must be a list, numpy array, or None!")
if len(selection) == 0:
return [None] * Nspecies
def is_pair(item):
return isinstance(item, list) and len(item) == 2 and all(isinstance(x, expected_types) for x in item)
if is_pair(selection):
return [[selection]] * Nspecies
if all(is_pair(item) for item in selection):
if len(selection) == Nspecies:
return [[item] for item in selection]
else:
return [selection] * Nspecies
def is_species_selection(item):
if item is None: return True
if not isinstance(item, list): return False
if is_pair(item): return True
return all(is_pair(sub) for sub in item)
if all(is_species_selection(item) for item in selection):
if len(selection) == 1:
val = selection[0]
common = [val] if is_pair(val) else val
return [common] * Nspecies
elif len(selection) == Nspecies:
return [[item] if is_pair(item) else item for item in selection]
else:
raise ValueError(f"Outer list length for {arg_name} must be 1 or {Nspecies}!")
raise ValueError(f"Unrecognized format for {arg_name}.")
[docs]
def run_0D_slab(Ng,Tg,vturb,line_selection_file='',molecule='',cdn_scale_fac=[1],mol_mass=None,HITRANfile='',ESelection=None,bandSelection=None,hitran_min_strength=1e-99,custom_partition_sum_file='',isotopolog=[1],waveSelection=[4,30],wave_spec=[4.9,28],R_grid=1e5,output='both',output_filename='default.out',mode='both',convolve_R=None,verbose=True):
"""
Simulates 0D slab models for molecular emission/absorption spectra.
This is a Python implementation of 0D slab models based on the FORTRAN version
in ProDiMo (https://prodimo.iwf.oeaw.ac.at/). It runs models with or without
line overlap. For running large grids of models, run_0D_slab_grid() or the Fortran
version is recommended as they support OpenMP for better performance.
Parameters
----------
Ng : float
Gas column density in cm^{-2}.
Tg : float
Gas temperature in Kelvin (K).
vturb : float
Turbulent velocity of the gas in km/s.
line_selection_file : str, optional
Path to a pre-defined line selection rules file. If provided, overrides
arguments: `molecule`, `mass`, `HITRANfile`, `ESelection`, `bandSelection`,
`hitran_min_strength`, `custom_partition_sum_file`, `isotopolog`, and `waveSelection`.
Default is ''.
molecule : str or list of str, optional
Molecule name(s) (e.g., 'CO2' or ['CO2', 'C2H2', 'HCN']). Default is ''.
cdn_scale_fac : list of float, optional
Scaling factors for the column density of each molecule. Useful for
applying isotopologue abundance ratios. If the same size as number of species
in `line_selection_file`, it overrides values from `line_selection_file`. Default is [1].
mol_mass : float or list of float, optional
Molecular mass(es) in atomic mass units (amu). Required if
`line_selection_file` is not provided. Default is None.
HITRANfile : str or list of str, optional
Path(s) to the HITRAN '.par' file(s) containing the line data. Default is ''.
ESelection : list or None, optional
Upper energy level limits for line selection. Can be a single range
(e.g., [0, 4000]) or a nested list of ranges per species. Default is None.
bandSelection : list or None, optional
Upper and lower level band quantum number notations (strings) to select.
e.g., ['0 0 1 0 A*', '0 0 0 0 A*']. Default is None.
hitran_min_strength : float or list of float, optional
Minimum line strength threshold to read from the HITRAN file. Default is 1e-99.
custom_partition_sum_file : str or list of str, optional
Path(s) to file(s) containing custom partition sums (two columns:
temperature, partition sums). Default is ''.
isotopolog : list of int, optional
List containing HITRAN isotopologue IDs for the selected species. Default is [1].
waveSelection : list, optional
Wavelength limits in microns to restrict the lines selected from the
HITRAN file. Default is [4, 30].
wave_spec : list, optional
Wavelength limits in microns for calculating the final output spectra.
Default is [4.9, 28].
R_grid : float, optional
Resolving power of the high-resolution grid on which the spectra is
initially calculated. A minimum of 1e5 is recommended. Default is 1e5.
output : {'file', 'return', 'both'}, optional
Defines how the output is handled. 'file' writes to disk, 'return' outputs
the slab object to the Python environment, and 'both' does both. Default is 'both'.
output_filename : str, optional
Filename and path for the output file if `output` writes to disk.
Default is 'default.out'.
mode : {'overlap', 'line_by_line', 'both'}, optional
Calculates the spectra considering line overlap, strictly line-by-line,
or both. Default is 'both'.
convolve_R : float or None, optional
Spectral resolving power to which the final output spectra should be
convolved (e.g., 3500 for JWST/MIRI). If None, no convolution is applied.
Default is None.
verbose : bool, optional
If True, prints progress logs during calculation. Default is True.
Returns
-------
slab : object or tuple
The calculated slab object containing the spectra and optical depth data.
Returns an empty tuple `()` if `output` is set to 'file'.
Raises
------
TypeError
If `output_filename` or `custom_partition_sum_file` are of incorrect types.
ValueError
If the lengths of species-specific lists (masses, HITRAN files, scaling
factors, etc.) do not match the number of molecules provided.
"""
nu_axis = rs.generate_grid(R=R_grid,lambda_0=wave_spec[0],lambda_n=wave_spec[1],sampling=1)*1e9
N_nu = len(nu_axis)
W_grid = c / nu_axis / um
nu_grid = nu_axis.reshape(1, 1, N_nu)
W_grid = W_grid.reshape(1, 1, N_nu)
Ng *= percm2_to_perm2
vturb *= kmps
if not isinstance(output_filename, str):
raise TypeError("output_filename should be a string")
N_sp_lines,mol_data,N_lines,custom_partition_sum_int = [],[],[],[]
rules = []
if line_selection_file == '':
if isinstance(molecule,str):
molecule = [molecule]
Nspecies = 1
Nspecies = len(molecule)
if len(cdn_scale_fac)!=Nspecies:
raise ValueError(f'Number of molecules ({Nspecies}) and number of column density scaling factors ({len(cdn_scale_fac)}) supplied are not same!')
if mol_mass == None:
raise ValueError(f"The molecular mass(es) should be provided when line selection file is not provided")
if not (isinstance(mol_mass,list) | isinstance(mol_mass,np.ndarray)):
mol_mass = [mol_mass]
if not (isinstance(cdn_scale_fac,list) | isinstance(cdn_scale_fac,np.ndarray)):
cdn_scale_fac = [cdn_scale_fac]
if not (isinstance(isotopolog,list) | isinstance(isotopolog,np.ndarray)):
isotopolog = [isotopolog]
if isinstance(HITRANfile,str):
HITRANfile = [HITRANfile]
if not isinstance(hitran_min_strength, (list, np.ndarray)):
hitran_min_strength = [hitran_min_strength]
if isinstance(hitran_min_strength, np.ndarray):
hitran_min_strength = hitran_min_strength.tolist()
if len(hitran_min_strength) != Nspecies:
hitran_min_strength = hitran_min_strength * Nspecies
if isinstance(custom_partition_sum_file,str):
custom_partition_sum_file = [custom_partition_sum_file]
if len(custom_partition_sum_file) != Nspecies:
custom_partition_sum_file *= Nspecies
if len(mol_mass)!=Nspecies:
raise ValueError(f'Number of molecules ({Nspecies}) and number of molecular masses ({len(mol_mass)}) supplied are not same!')
if len(HITRANfile)!=Nspecies:
raise ValueError(f'Number of molecules ({Nspecies}) and number of HITRAN filepaths ({len(HITRANfile)}) supplied are not same!')
if len(isotopolog)!=Nspecies:
raise ValueError(f'Number of molecules ({Nspecies}) and number of HITRAN isotopologue IDs ({len(isotopolog)}) supplied are not same!')
waveSelection = parse_paired_rules(waveSelection, Nspecies, arg_name="waveSelection", expected_types=(int, float))
ESelection = parse_paired_rules(ESelection, Nspecies, arg_name="ESelection", expected_types=(int, float))
bandSelection = parse_paired_rules(bandSelection, Nspecies, arg_name="bandSelection", expected_types=(str,))
for isp in range(Nspecies):
if custom_partition_sum_file[isp] == None:
custom_partition_sum_file[isp] = ''
rule = ht.selection_rules()
rule.name = molecule[isp].split('_')[0]
rule.iso = isotopolog[isp]
rule.mass = mol_mass[isp]
rule.file = HITRANfile[isp]
rule.abundance_fraction = cdn_scale_fac[isp]
rule.waveSelection = waveSelection[isp]
rule.ESelection = ESelection[isp]
rule.bandSelection = bandSelection[isp]
rule.hitran_min_strength = hitran_min_strength[isp]
rule.custom_partition_sum = custom_partition_sum_file[isp]
rules.append(rule)
else:
rules = ht.read_line_selection(path=line_selection_file)
Nspecies = len(rules)
molecule = [rule.name for rule in rules]
mol_mass = [rule.mass for rule in rules]
cdn_scale_fac = [rule.abundance_fraction for rule in rules]
isotopolog = [rule.iso for rule in rules]
nu_0_list, Aul_list, gu_list, gl_list, E_u_list, E_l_list = [], [], [], [], [], []
for isp in range(Nspecies):
mol_data.append(ht.read_hitran_from_rules(rules[isp]))
N_sp_lines.append(len(mol_data[isp]))
lambda_0 = np.array(mol_data[isp]['lambda']) * um
nu_0_list.append((c / lambda_0).reshape(-1))
Aul_list.append(np.array(mol_data[isp]['A']).reshape(-1))
gu_list.append(np.array(mol_data[isp]['g_u']).reshape(-1))
gl_list.append(np.array(mol_data[isp]['g_l']).reshape(-1))
E_u_list.append(np.array(mol_data[isp]['E_u']).reshape(-1))
E_l_list.append(np.array(mol_data[isp]['E_l']).reshape(-1))
custom_partition_sum_int.append(None)
if not(isinstance(rules[isp].custom_partition_sum,str) or rules[isp].custom_partition_sum == None):
raise TypeError("Expected 'string' type for custom_partition_sum_file")
if rules[isp].custom_partition_sum == None:
rules[isp].custom_partition_sum = ''
if rules[isp].custom_partition_sum != '':
custom_partition_sum_int[isp] = read_partition_sums(rules[isp].custom_partition_sum)
nu_0 = np.concatenate(nu_0_list)
Aul = np.concatenate(Aul_list)
gu = np.concatenate(gu_list)
gl = np.concatenate(gl_list)
E_u = np.concatenate(E_u_list)
E_l = np.concatenate(E_l_list)
I_nu_line, tau_ret, I_nu, tau_lbl, pop_l, pop_u = run_0D_point(Ng,Tg,vturb,molecule,mol_mass,cdn_scale_fac,N_sp_lines,nu_0,Aul,E_u,E_l,gu,gl,isotopolog,nu_axis,mode,custom_partition_sum_int)
slab = convert_to_slab_format(nu_grid,I_nu_line,tau_ret,I_nu,tau_lbl,R_grid,Ng,Tg,cdn_scale_fac,vturb,pop_l,pop_u,mol_data,molecule,N_sp_lines,mode)
if output in ['file', 'both']:
if mode in ['overlap','both']:
slab[0].write_to_file(output_filename, 'overlap', verbose)
if mode in ['line_by_line','both']:
slab.write_to_file(output_filename, 'line_by_line', verbose)
if output in ['return', 'both']:
if not (convolve_R is None):
slab.convolve(R=convolve_R,lambda_0=wave_spec[0],lambda_n=wave_spec[1],overlap= True if mode in ['overlap','both'] else False,verbose=verbose)
return(slab)
return()
def _worker_slab(indexed_params, molecule, mol_mass, cdn_scale_fac, N_sp_lines, nu_0, Aul, E_u, E_l, gu, gl,
isotopolog, nu_axis, mode, custom_partition_sum_int,
nu_grid, R_grid, mol_data, output, output_directory, base_filename, verbose, convolve_R, wave_spec):
idx, (Ng_val, Tg_val, vturb_val) = indexed_params
current_filename = f"{base_filename}_{idx+1:04d}"
current_filename = os.path.join(output_directory,current_filename)
I_nu_line, tau_ret, I_nu, tau_lbl, pop_l, pop_u = run_0D_point(Ng_val, Tg_val, vturb_val, molecule, mol_mass, cdn_scale_fac, N_sp_lines, nu_0, Aul, E_u, E_l, gu, gl, isotopolog, nu_axis, mode, custom_partition_sum_int)
slab = convert_to_slab_format(nu_grid, I_nu_line, tau_ret, I_nu, tau_lbl, R_grid, Ng_val, Tg_val, cdn_scale_fac, vturb_val, pop_l, pop_u, mol_data, molecule, N_sp_lines, mode)
if output in ['file', 'both']:
if mode in ['overlap','both']:
slab[0].write_to_file(current_filename, 'overlap', verbose)
if mode in ['line_by_line','both']:
slab.write_to_file(current_filename, 'line_by_line', verbose)
if output in ['return', 'both'] and convolve_R is not None:
slab.convolve(R=convolve_R, lambda_0=wave_spec[0], lambda_n=wave_spec[1], overlap=True if mode in ['overlap', 'both'] else False, verbose=verbose)
return slab if output in ['return', 'both'] else None
[docs]
def run_0D_slab_grid(Ng, Tg, vturb, line_selection_file='', molecule='', mol_mass=None, HITRANfile='',
ESelection=None, bandSelection=None, hitran_min_strength=1e-99, cdn_scale_fac=[1],
custom_partition_sum_file='', isotopolog=[1], waveSelection=[4,30], wave_spec=[4.9,28],
R_grid=1e5, output='both', output_directory='.', output_filename_prefix='default',
mode='both', convolve_R=None, write_index_file=True, number_of_cores=1, verbose=True):
'''
This is a python implementation of 0D slab models, based on the FORTRAN version of 0D slab models in ProDiMo (https://prodimo.iwf.oeaw.ac.at/)
This runs models with/without line overlap. More robust and efficient code is available on Fortran, please check the above website (especially for running large grids of models).
The Fortran code also supports OMP and supports including multiple species.
Ng : list or numpy.ndarray
Gas column density (cm-2)
Tg: list or numpy.ndarray
Gas temperature (K)
vturb : list or numpy.ndarray
Turbulent velocity of the gas (km/s)
line_selection_file : str, optional
Path to a pre-defined line selection rules file. If provided, overrides
arguments: `molecule`, `mass`, `HITRANfile`, `ESelection`, `bandSelection`,
`hitran_min_strength`, `custom_partition_sum_file`, `isotopolog`, and `waveSelection`.
Default is ''.
molecule : string or list of strings, optional
Molecule name, e.g. 'CO2' or ['CO2','C2H2','HCN']
mol_mass : float or list of floats, optional
Molecular mass(es) in atomic mass units (amu)
HITRANfile : string or list of strings, optional
Path(s) to the HITRAN '.par' file(s) containing the line data downloaded from HITRAN
ESelection : list or None, optional
Upper energy level limits for line selection.
bandSelection : list or None, optional
Upper and lower level band quantum number notations (strings) to select.
hitran_min_strength : float or list of float, optional
Minimum line strength threshold to read from the HITRAN file, based on Woitke+2018, Eq. 2.
Default is 1e-99.
cdn_scale_fac : list of float, optional
Scaling factors for the column density of each molecule. Useful for
applying isotopologue abundance ratios. If the same size as number of species
in `line_selection_file`, it overrides values from `line_selection_file`. Default is [1].
custom_partition_sum_file : string or list of strings, optional
Path pointing to file containing custom partition sum
isotopolog : list, optional
List containing isotopologue numbers (HITRAN format)
waveSelection : list, optional
Wavelength limits in microns to select the lines from the HITRAN file
wave_spec : list, optional
Wavelength limits in microns for calculating the spectra
R_grid : float, optional
High resolution grid on which the spectra has to be calculated initially
output : string, optional
'file' to write output to file, 'return' to return a slab type, 'both' for both
output_directory : string, optional
Directory path where the output files should be written
output_filename_prefix : string, optional
Filename prefix for the output file without file extension
mode : string, optional
'overlap' or 'line_by_line' or 'both'
convolve_R : float, optional
Spectral resolving power to which the spectra has to be convolved
write_index_file : boolean, optional
Write an index file containing the index number of each file
number_of_cores : int or string, optional
Number of cores to use to parallelize the grid run.
verbose : boolean, optional
Whether or not to print progress log
'''
if not isinstance(output_filename_prefix, str):
raise TypeError("output_filename_prefix should be a string")
Ng_internal = np.array(Ng) * percm2_to_perm2
vturb_internal = np.array(vturb) * kmps
parameter_grid = list(enumerate(zip(Ng_internal, Tg, vturb_internal)))
if write_index_file:
index_data = np.array([np.arange(len(Ng)) + 1, Ng, Tg, vturb]).T
index_filename = os.path.join(output_directory, f"{output_filename_prefix}_index.txt")
np.savetxt(index_filename, index_data, fmt=['%05d', '%.4e', '%.2f', '%.2f'], header='# Index | Ng [cm-2] | Tg [K] | vturb [km/s]', delimiter=' ')
N_sp_lines, mol_data, N_lines, custom_partition_sum_int = [], [], [], []
rules = []
if line_selection_file == '' or line_selection_file is None:
if isinstance(molecule, str):
molecule = [molecule]
Nspecies = len(molecule)
if not isinstance(mol_mass, (list, np.ndarray)): mol_mass = [mol_mass]
if not isinstance(cdn_scale_fac, (list, np.ndarray)): cdn_scale_fac = [cdn_scale_fac]
if not isinstance(isotopolog, (list, np.ndarray)): isotopolog = [isotopolog]
if isinstance(HITRANfile, str): HITRANfile = [HITRANfile]
if not isinstance(hitran_min_strength, (list, np.ndarray)):
hitran_min_strength = [hitran_min_strength]
if isinstance(hitran_min_strength, np.ndarray):
hitran_min_strength = hitran_min_strength.tolist()
if len(hitran_min_strength) != Nspecies:
hitran_min_strength = hitran_min_strength * Nspecies
if isinstance(custom_partition_sum_file, str):
custom_partition_sum_file = [custom_partition_sum_file]
if len(custom_partition_sum_file) != Nspecies:
custom_partition_sum_file *= Nspecies
if mol_mass[0] is None:
raise ValueError(f"The molecular mass(es) should be provided when line selection file is not provided")
if len(mol_mass) != Nspecies:
raise ValueError(f'Number of molecules ({Nspecies}) and number of molecular masses ({len(mol_mass)}) supplied are not same!')
if len(HITRANfile) != Nspecies:
raise ValueError(f'Number of molecules ({Nspecies}) and number of HITRAN filepaths ({len(HITRANfile)}) supplied are not same!')
if len(isotopolog) != Nspecies:
raise ValueError(f'Number of molecules ({Nspecies}) and number of HITRAN isotopologue IDs ({len(isotopolog)}) supplied are not same!')
if len(cdn_scale_fac) != Nspecies:
raise ValueError(f'Number of molecules ({Nspecies}) and number of column density scaling factors ({len(cdn_scale_fac)}) supplied are not same!')
waveSelection = parse_paired_rules(waveSelection, Nspecies, arg_name="waveSelection", expected_types=(int, float))
ESelection = parse_paired_rules(ESelection, Nspecies, arg_name="ESelection", expected_types=(int, float))
bandSelection = parse_paired_rules(bandSelection, Nspecies, arg_name="bandSelection", expected_types=(str,))
for isp in range(Nspecies):
if custom_partition_sum_file[isp] is None:
custom_partition_sum_file[isp] = ''
rule = ht.selection_rules()
rule.name = molecule[isp].split('_')[0]
rule.iso = isotopolog[isp]
rule.mass = mol_mass[isp]
rule.file = HITRANfile[isp]
rule.abundance_fraction = cdn_scale_fac[isp]
rule.waveSelection = waveSelection[isp]
rule.ESelection = ESelection[isp]
rule.bandSelection = bandSelection[isp]
rule.hitran_min_strength = hitran_min_strength[isp]
rule.custom_partition_sum = custom_partition_sum_file[isp]
rules.append(rule)
else:
rules = ht.read_line_selection(path=line_selection_file)
Nspecies = len(rules)
molecule = [rule.name.split('_')[0] for rule in rules]
mol_mass = [rule.mass for rule in rules]
file_abundances = [rule.abundance_fraction for rule in rules]
if cdn_scale_fac is None:
cdn_scale_fac = file_abundances
elif isinstance(cdn_scale_fac, (int, float, np.number)):
if Nspecies == 1:
cdn_scale_fac = [float(cdn_scale_fac)]
else:
cdn_scale_fac = file_abundances
elif isinstance(cdn_scale_fac, (list, np.ndarray)):
if isinstance(cdn_scale_fac, np.ndarray):
cdn_scale_fac = cdn_scale_fac.tolist()
if len(cdn_scale_fac) == Nspecies:
pass
else:
cdn_scale_fac = file_abundances
else:
cdn_scale_fac = file_abundances
for isp in range(Nspecies):
rules[isp].abundance_fraction = cdn_scale_fac[isp]
isotopolog = [rule.iso for rule in rules]
nu_axis = rs.generate_grid(R=R_grid, lambda_0=wave_spec[0], lambda_n=wave_spec[1], sampling=1) * 1e9
N_nu = len(nu_axis)
W_grid = c / nu_axis / um
nu_grid = nu_axis.reshape(1, 1, N_nu)
W_grid = W_grid.reshape(1, 1, N_nu)
nu_0_list, Aul_list, gu_list, gl_list, E_u_list, E_l_list = [], [], [], [], [], []
for isp in range(Nspecies):
if verbose: print(rules[isp])
mol_data.append(ht.read_hitran_from_rules(rules[isp]))
N_sp_lines.append(len(mol_data[isp]))
lambda_0 = np.array(mol_data[isp]['lambda']) * um
nu_0_list.append((c / lambda_0).reshape(-1))
Aul_list.append(np.array(mol_data[isp]['A']).reshape(-1))
gu_list.append(np.array(mol_data[isp]['g_u']).reshape(-1))
gl_list.append(np.array(mol_data[isp]['g_l']).reshape(-1))
E_u_list.append(np.array(mol_data[isp]['E_u']).reshape(-1))
E_l_list.append(np.array(mol_data[isp]['E_l']).reshape(-1))
custom_partition_sum_int.append(None)
if not (isinstance(rules[isp].custom_partition_sum, str) or rules[isp].custom_partition_sum is None):
raise TypeError("Expected 'string' type for custom_partition_sum_file")
if rules[isp].custom_partition_sum is None:
rules[isp].custom_partition_sum = ''
if rules[isp].custom_partition_sum != '':
custom_partition_sum_int[isp] = read_partition_sums(rules[isp].custom_partition_sum)
nu_0 = np.concatenate(nu_0_list)
Aul = np.concatenate(Aul_list)
gu = np.concatenate(gu_list)
gl = np.concatenate(gl_list)
E_u = np.concatenate(E_u_list)
E_l = np.concatenate(E_l_list)
func = partial(_worker_slab,
molecule=molecule, mol_mass=mol_mass, cdn_scale_fac=cdn_scale_fac, N_sp_lines=N_sp_lines, nu_0=nu_0, Aul=Aul,
E_u=E_u, E_l=E_l, gu=gu, gl=gl, isotopolog=isotopolog,
nu_axis=nu_axis, mode=mode,
custom_partition_sum_int=custom_partition_sum_int,
nu_grid=nu_grid, R_grid=R_grid, mol_data=mol_data,
output=output, output_directory=output_directory, base_filename=output_filename_prefix,
verbose=verbose, convolve_R=convolve_R, wave_spec=wave_spec)
if isinstance(number_of_cores, str):
total_cores = os.cpu_count() or 1
if number_of_cores == 'max':
n_workers = total_cores
elif number_of_cores == 'half':
n_workers = max(1, total_cores // 2)
elif number_of_cores == 'quarter':
n_workers = max(1, total_cores // 4)
else:
n_workers = 1
else:
n_workers = number_of_cores
with ProcessPoolExecutor(max_workers=n_workers) as executor:
results = list(executor.map(func, parameter_grid))
if output in ['return', 'both']:
return results
[docs]
def run_0D_point(Ng,Tg,vturb,molecule,mol_mass,cdn_scale_fac,N_sp_lines,nu0,Aul,E_u,E_l,gu,gl,isotopolog,nu_grid,mode,custom_partition_sum_int=[None]):
'''
This runs a RT on a single point with/without line overlap.
All variables use SI unit
Ng : float
Gas column density (m-2)
Tg: float
Gas temperature (K)
vturb : float
Turbulent velocity of the gas (m/s)
molecule : string or list of strings
Molecule name, e.g. 'CO2' or ['CO2','C2H2','HCN']
mol_mass : float or list of floats
Molecular mass(es) in atomic mass units (amu)
cdn_scale_fac : list
Column density scaling factor
N_sp_lines : list
List of number of lines of each molecule
nu0 : numpy array
Line frequencies (Hz)
Aul : numpy array
Einstein A coefficient
E_u : numpy array
Upper energy level
E_l : numpy array
Lower energy level
gu : numpy array
Statistical weight / degeneracy of upper level
gl : numpy array
Statistical weight / degeneracy of lower level
isotopolog : list
List containing isotopologue numbers (HITRAN format)
nu_grid : numpy array
Array containing spectral grid for the final spectra
mode : string
'line_by_line' or 'overlap' or 'both'
custom_partition_sum_file : string
Path pointing to file containing custom partition sum (2 columns: temperature, partition sums)
'''
N_nu = len(nu_grid)
N_lines = int(np.sum(N_sp_lines))
Ng = np.ones_like(nu0)*Ng
Tg = np.ones_like(nu0)*Tg
vg = np.zeros_like(nu0)
pop_u = np.zeros_like(nu0)
pop_l = np.zeros_like(nu0)
sigma = np.zeros_like(nu0)
Tauuu = np.zeros_like(nu0)
I_nu = np.zeros_like(nu0)
idx_start = 0
for isp in range(len(N_sp_lines)):
Ng[idx_start:idx_start+N_sp_lines[isp]] *= cdn_scale_fac[isp]
vg[idx_start:idx_start+N_sp_lines[isp]] = np.sqrt(vturb**2 + 2.0 * kb * Tg[idx_start:idx_start+N_sp_lines[isp]] / (mol_mass[isp] * amu))
if not custom_partition_sum_int[isp]==None:
QT = custom_partition_sum_int[isp](Tg[0])
else:
QT = hp.partitionSum(QT_mol_id[molecule[isp]],isotopolog[isp],Tg[0])
pop_u[idx_start:idx_start+N_sp_lines[isp]] = rs.boltzmann_distribution(E_u[idx_start:idx_start+N_sp_lines[isp]], gu[idx_start:idx_start+N_sp_lines[isp]], Tg[idx_start:idx_start+N_sp_lines[isp]], QT)
pop_l[idx_start:idx_start+N_sp_lines[isp]] = rs.boltzmann_distribution(E_l[idx_start:idx_start+N_sp_lines[isp]], gl[idx_start:idx_start+N_sp_lines[isp]], Tg[idx_start:idx_start+N_sp_lines[isp]], QT)
idx_start += N_sp_lines[isp]
Tauuu = (Aul*(c/nu0)**3/(8.0*np.pi**1.5*vg)*Ng*(gu/gl*pop_l-pop_u))
sigma = vg*nu0/c
tau_ret = np.zeros(N_nu)
I_nu_line = np.zeros(N_nu)
nu_axis = nu_grid
if mode in ['overlap', 'both']:
if nu_grid[0] > nu_grid[-1]:
nu_axis = nu_grid[::-1]
reverse_output = True
else:
reverse_output = False
half_width_factor = 10.0 # sigma_thresh/2 (20/2)
for i in range(N_lines):
center = nu0[i]
s = sigma[i]
width = half_width_factor * s
left = np.searchsorted(nu_axis, center - width)
right = np.searchsorted(nu_axis, center + width)
if left >= right:
continue
nu_slice = nu_axis[left:right]
delta = nu_slice - center
prof = rs.profile_function(nu_slice,center,s)
# np.exp(-(delta**2) / s**2)
tau_ret[left:right] += Tauuu[i]*prof
I_nu_line = rs.planck(Tg[0], nu_axis)*(1.0 - np.exp(-tau_ret))
if reverse_output:
tau_ret = tau_ret[::-1]
I_nu_line = I_nu_line[::-1]
if mode in ['line_by_line', 'both']:
I_nu = (rs.planck(Tg,nu0)*growth_function(Tauuu)*vg*nu0/c)
return (I_nu_line,tau_ret,I_nu,Tauuu,pop_l,pop_u)
[docs]
def run_1D_radial_slab(Ng,Tg,R,d,vturb,R_grid,output_filename,molecule,mol_mass,HITRANfile,cdn_scale_fac=[1],custom_partition_sum_file='',width='area_given',Rin_limit=0,Rout_limit=1e99,isotopolog=[1],wave_mol=[4,30],wave_spec=[4.9,28],verbose=False):
'''
This is a python implementation of 1D radial slab models, based on the FORTRAN version of 1D slab models in ProDiMo (https://prodimo.iwf.oeaw.ac.at/)
This runs models with line overlap. More robust and efficient code (including vertical slab model) is available in Fortran, please check the above website (especially for running large grids of models).
The Fortran code also supports OMP and supports including multiple species.
Ng : numpy array or list
Gas column density (cm-2)
Tg : numpy array or list
Gas temperature (K)
R : numpy array or list
Radius (AU)
d : float
Distance of the disk (parsec)
vturb : float
Turbulent velocity of the gas (km/s)
R_grid : float
High resolution grid on which the spectra has to be calculated initially
Recommend minimum of 1e5
output_filename : string
Filename(+path) for the fits output file
Use the extension '.fits.gz'
molecule : string
Molecule name, e.g. 'CO2'
molecule : string or list of strings
Molecule name, e.g. 'CO2' or ['CO2','C2H2','HCN']
mol_mass : float or list of floats
Molecular mass(es) in atomic mass units (amu)
HITRANfile : string or listt of strings
Path(s) to the HITRAN '.par' file(s) containing the line data downloaded from HITRAN
cdn_scale_fac : list of floats
Column density of the molecules are scaled by these factors (useful for including isotopologue ratios)
custom_partition_sum_file : string
Path pointing to file containing custom partition sum (2 columns: temperature, partition sums)
width : string or list or float
'area_given' - the provided radii array is used as the emitting area equivalent radius
'infer' - Assumes the provided radius as radial distance from the star and calculates the width of individual rings of slab grids
list - List containing emitting area equivalent radii
float - Constant emitting area equivalent radius, used for all grid points
Rin_limit : float
Inner radius limit of the disk (AU)
Rout_limit : float
Outer radius limit of the disk (AU)
isotopolog : list
List containing isotopologue numbers (HITRAN format)
wave_mol : list
Wavelength limits in microns to select the lines from the HITRAN file
wave_spec : list
Wavelength limits in microns for calculating the spectra
'''
if isinstance(output_filename,str):
tempname,tempext = os.path.splitext(output_filename)
temp2name,temp2ext = os.path.splitext(tempname)
if temp2ext+tempext == '.fits.gz':
output_filename = temp2name
else:
raise TypeError('output_filename should be a string')
if isinstance(molecule,str):
molecule = [molecule]
Nspecies = 1
Nspecies = len(molecule)
if not (isinstance(mol_mass,list) | isinstance(mol_mass,np.ndarray)):
mol_mass = [mol_mass]
if not (isinstance(cdn_scale_fac,list) | isinstance(cdn_scale_fac,np.ndarray)):
cdn_scale_fac = [cdn_scale_fac]
if not (isinstance(isotopolog,list) | isinstance(isotopolog,np.ndarray)):
isotopolog = [isotopolog]
if isinstance(HITRANfile,str):
HITRANfile = [HITRANfile]
if isinstance(custom_partition_sum_file,str):
custom_partition_sum_file = [custom_partition_sum_file]
if len(custom_partition_sum_file) != Nspecies:
custom_partition_sum_file *= Nspecies
if len(mol_mass)!=Nspecies:
raise ValueError(f'Number of molecules ({Nspecies}) and number of molecular masses ({len(mol_mass)}) supplied are not same!')
if len(HITRANfile)!=Nspecies:
raise ValueError(f'Number of molecules ({Nspecies}) and number of HITRAN filepaths ({len(HITRANfile)}) supplied are not same!')
if len(isotopolog)!=Nspecies:
raise ValueError(f'Number of molecules ({Nspecies}) and number of HITRAN isotopologue IDs ({len(isotopolog)}) supplied are not same!')
if len(cdn_scale_fac)!=Nspecies:
raise ValueError(f'Number of molecules ({Nspecies}) and number of column density scaling factors ({len(cdn_scale_fac)}) supplied are not same!')
nu_axis = rs.generate_grid(R=R_grid,lambda_0=wave_spec[0],lambda_n=wave_spec[1],sampling=1)*1e9
N_nu = len(nu_axis)
W_grid = c / nu_axis / um
nu_grid = nu_axis.reshape(N_nu)
W_grid = W_grid.reshape(N_nu)
Ng = np.array(Ng)*percm2_to_perm2
vturb = np.array(vturb)*kmps
N_sp_lines,mol_data,N_lines,custom_partition_sum_int = [],[],[],[]
nu_0,Aul,gu,gl,E_u,E_l = np.array([]),np.array([]),np.array([]),np.array([]),np.array([]),np.array([])
for isp in range(Nspecies):
mol_data.append(ht.read_hitran(HITRANfile[isp],molecule[isp],[isotopolog[isp]],wave_mol[0],wave_mol[1]))
N_sp_lines.append(len(mol_data[isp]))
lambda_0 = np.array(mol_data[isp]['lambda']) * um
nu_0 = np.concatenate((nu_0,(c / lambda_0).reshape(N_sp_lines[isp])))
Aul = np.concatenate((Aul,np.array(mol_data[isp]['A']).reshape(N_sp_lines[isp])))
gu = np.concatenate((gu,np.array(mol_data[isp]['g_u']).reshape(N_sp_lines[isp])))
gl = np.concatenate((gl,np.array(mol_data[isp]['g_l']).reshape(N_sp_lines[isp])))
E_u = np.concatenate((E_u,np.array(mol_data[isp]['E_u']).reshape(N_sp_lines[isp])))
E_l = np.concatenate((E_l,np.array(mol_data[isp]['E_l']).reshape(N_sp_lines[isp])))
custom_partition_sum_int.append(None)
if not(isinstance(custom_partition_sum_file[isp],str) or custom_partition_sum_file[isp] == None):
raise TypeError("Expected 'string' type for custom_partition_sum_file")
if custom_partition_sum_file[isp] == None:
custom_partition_sum_file[isp] = ''
if custom_partition_sum_file[isp] != '':
custom_partition_sum_int[isp] = read_partition_sums(custom_partition_sum_file[isp])
R = np.array(R)*au
Rin_limit *= au
Rout_limit *= au
Rr = np.zeros_like(R)
if width == 'infer':
if len(R)>1:
if ((R[1]-R[0])/2>R[0]) or ((R[1]-R[0])/2>(R[0]-Rin_limit)) :
Rr[0] = (R[0]+(R[1]-R[0])/2)**2-Rin_limit**2
else:
Rr[0] = (R[0]+(R[1]-R[0])/2)**2-(R[0]-(R[1]-R[0])/2)**2
if ((R[-2]-R[-1])/2>(Rout_limit-R[-1])) :
Rr[-1] = Rout_limit**2-(R[-2]+(R[-1]-R[-2])/2)**2
else:
Rr[-1] = ((R[-1]-R[-2])/2+R[-1])**2-(R[-1]-(R[-1]-R[-2])/2)**2
if len(R)>2:
for i in range(len(Ng)-2):
Rr[i+1] = (R[i+1]+(R[i+2]-R[i+1])/2)**2-(R[i+1]-(R[i+1]-R[i])/2)**2
else:
Rr = R**2
elif width == 'area_given':
Rr = R**2
elif isinstance(width,float) or isinstance(width,int):
Rr+=width**2
else:
for i in range(len(Ng)):
Rr[i] = (R[i]+width[i]/2)**2-(R[i]-width[i]/2)**2
d *= pc
if isinstance(vturb,float) or isinstance(vturb,int): vturb = np.ones_like(Ng)*vturb
total_flux = np.zeros_like(nu_grid)
for i in range(len(Ng)):
I_nu_line,tau_ret,I_nu,tau_lbl,pop_l,pop_u = run_0D_point(Ng[i],Tg[i],vturb[i],molecule,mol_mass,cdn_scale_fac,N_sp_lines,nu_0,Aul,E_u,E_l,gu,gl,isotopolog,nu_grid,'overlap',custom_partition_sum_int)
total_flux += I_nu_line*np.pi*Rr[i]/d**2
slab = convert_to_slab_format(nu_grid,total_flux,tau_ret*0.0,I_nu*0.0,tau_lbl*0.0,R_grid,0,0,[0],0,0,0,0,molecule,N_sp_lines,'overlap')
slab.write_to_file(output_filename,'overlap',verbose=verbose)
return
[docs]
def growth_function(tau):
'''
This function returns the growth function, or the flux growth for the given line optical depth(s)
tau : numpy array or list
line optical depth
'''
tau = np.asanyarray(tau)
growth = np.zeros_like(tau)
cond2 = (tau >= 1e-13) & (tau < 10)
cond3 = (tau >= 10)
t2 = tau[cond2]
if t2.size > 0:
growth[cond2] = (7.98554359 * t2**0.9999470274) / (4.509393063 + 1.72466517 * t2**1.062901665)
t3 = tau[cond3]
if t3.size > 0:
growth[cond3] = 2.8859693 * (np.log10(t3)**0.50868585) + 0.40682335
return growth
[docs]
def get_dust_opacity_provider(dust_sizes, wavelengths, Q_abs_table):
"""
Factory that returns a pre-configured interpolation function.
"""
log_sizes = np.log10(dust_sizes)
log_waves = np.log10(wavelengths)
log_Q = np.log10(Q_abs_table + 1e-99)
interp_engine = RegularGridInterpolator(
(log_sizes, log_waves), log_Q,
method='linear', bounds_error=False, fill_value=None
)
def get_Q(d_size, wave):
d_size, wave = np.broadcast_arrays(d_size, wave)
query_points = np.stack([np.log10(d_size + 1e-99),
np.log10(wave + 1e-99)], axis=-1)
return 10**interp_engine(query_points)
return get_Q
[docs]
class slab_model_1D:
"""
class:: slab_model_1D
Class to run 1D forward slab models
"""
def __init__(self):
self.path=None
self.lambda_0=None
self.lambda_n=None
self.NLAMBDA=None
self.R_grid=None
self.nu_grid=None
self.wavelength_grid=None
self.total_source_function=None
self.flux_dust_only = None
self.dust_source_function = None
self.tau_grid=None
self.tau_dust_grid=None
self.flux=None
self.convWavelength=None
self.convFlux=None
self.convR=None
self.convContFlux=None
self.continuum=None
self.amin=None
self.amax=None
self.apow=None
self.da=None
self.NSIZE=None
self.dust_to_gas=None
self.fsize=None
self.asize=None
self.Qabs=None
self.em_coeff_dust=None
self.ab_coeff_dust=None
self.nd=None
self.Td=None
self.dust_settle=None
self.dust_settle_alpha=None
self.grain_density=None #kg/m3
self.linedata=None
self.leveldata=None
self.LineSelection=None
self.rules=None
self.mol_data=None
self.NGRID=None
self.z=None
self.dz=None
self.r_au=None
self.M_star=None
self.ng=None
self.Tg=None
self.nH=None
self.rhog=None
self.Nspecies=None
self.em_coeff_gas=None
self.ab_coeff_gas=None
self.vturb=None
self.vg=None
self.Hg=None
self.Qmatrix_file=None
self.Qmatrix_amin=None
self.Qmatrix_amax=None
self.Qmatrix_asize=None
self.Qmatrix_NSIZE=None
self.Qmatrix_NLAMMAX=None
self.Qmatrix_NDUST=None
self.Qmatrix_lammin=9.12e-6 #cm
self.Qmatrix_lammax=1 #cm
self.Qmatrix_lam=None
self.Qmatrix_Qabs=None
self.verbose=True
[docs]
def unit_conversions(self):
self.z *= au
if not (self.nd is None):
self.nd *= percm3_to_perm3
self.ng *= percm3_to_perm3
self.nH *= percm3_to_perm3
self.vturb *= kmps
if not (self.rhog is None):
self.rhog *= gpercm3_to_kgperm3
if not (self.grain_density is None):
self.grain_density *= gpercm3_to_kgperm3
self.amin*=um_to_m
self.amax*=um_to_m
[docs]
def reset_units(self):
self.z /= au
if not (self.nd is None):
self.nd /= percm3_to_perm3
self.ng /= percm3_to_perm3
self.nH /= percm3_to_perm3
self.vturb /= kmps
if not (self.rhog is None):
self.rhog /= gpercm3_to_kgperm3
if not (self.grain_density is None):
self.grain_density /= gpercm3_to_kgperm3
self.amin/=um_to_m
self.amax/=um_to_m
[docs]
def init_grid(self):
if self.verbose: print('INIT GRID')
self.nu_grid=rs.generate_grid(R=self.R_grid,lambda_0=self.lambda_0,lambda_n=self.lambda_n,sampling=1)*1e9
self.wavelength_grid = c/self.nu_grid
self.NLAMBDA=len(self.nu_grid)
self.total_source_function=np.zeros((self.NGRID,self.NLAMBDA))
self.tau_grid=np.zeros((self.NGRID,self.NLAMBDA))
self.tau_dust_grid=np.zeros((self.NGRID,self.NLAMBDA))
self.dz = np.gradient(self.z)
self.dust_source_function = np.zeros((self.NGRID, self.NLAMBDA))
[docs]
def read_Qmatrix(self):
if self.verbose: print('READ QMATRIX')
if not(self.Qmatrix_file==None):
hdul=fits.open(self.Qmatrix_file)
self.Qmatrix_amin=hdul[0].header['AMIN']
self.Qmatrix_amax=hdul[0].header['AMAX']
self.Qmatrix_NSIZE=hdul[0].header['NAXIS5']
self.Qmatrix_NLAMMAX=hdul[0].header['NAXIS6']
self.Qmatrix_NDUST=hdul[0].header['NDUST']
# self.Qmatrix_lammin=91.2e-3#hdul[0].header['LAMMIN']
# self.Qmatrix_lammax=1e4#hdul[0].header['LAMMAX']
self.Qmatrix_Qabs=hdul[0].data[2,:,:,0,0,0,0].T
self.Qmatrix_amin*=cm_to_m
self.Qmatrix_amax*=cm_to_m
self.Qmatrix_lammin*=cm_to_m
self.Qmatrix_lammax*=cm_to_m
[docs]
def init_dust(self):
if self.verbose: print('INIT DUST')
self.Qmatrix_lam=np.logspace(np.log10(self.Qmatrix_lammin),np.log10(self.Qmatrix_lammax),self.Qmatrix_NLAMMAX)
self.Qmatrix_asize=np.logspace(np.log10(self.Qmatrix_amin),np.log10(self.Qmatrix_amax),self.Qmatrix_NSIZE)
self.asize=np.logspace(np.log10(self.amin),np.log10(self.amax),self.NSIZE)
self.da=np.zeros(self.NSIZE)
diff=0.5*np.diff(self.asize)
self.da[1:]+=diff
self.da[:-1]+=diff
temp_fsize=(1-self.apow)/(self.amax**(1-self.apow)-self.amin**(1-self.apow))*self.asize**(-self.apow)
integral = np.sum(temp_fsize * self.da)
temp_fsize /= integral
self.fsize=np.repeat(temp_fsize.reshape((self.NSIZE,1)),[self.NGRID],axis=1).T
if self.nd is None:
if self.rhog is None:
rho_gas=self.nH*mu*amu
else:
rho_gas=self.rhog
grain_masses=(4.0/3.0)*np.pi*self.asize**3*self.grain_density
mean_grain_mass=np.sum(grain_masses*temp_fsize*self.da)
self.nd = (rho_gas*self.dust_to_gas)/mean_grain_mass
if self.dust_settle:
self.solve_dust_settle(r_au=self.r_au, M_star=self.M_star, grain_density=self.grain_density)
get_Q = get_dust_opacity_provider(self.Qmatrix_asize, self.Qmatrix_lam, self.Qmatrix_Qabs)
sizes_2d, waves_2d = np.meshgrid(self.asize, self.wavelength_grid, indexing='ij')
q_grid = get_Q(sizes_2d.ravel(), waves_2d.ravel()).reshape(sizes_2d.shape)
self.ab_coeff_dust=np.zeros((self.NGRID,self.NLAMBDA))
for i in range(self.NGRID):
geometric_term = (self.fsize[i,:]*np.pi*self.asize**2*self.da)[:, np.newaxis]
self.ab_coeff_dust[i,:] = self.nd[i]*np.sum(geometric_term*q_grid, axis=0)
del q_grid,sizes_2d,waves_2d,geometric_term
[docs]
def solve_dust_settle(self, r_au=1.0, M_star=1.0, grain_density=3000):
"""
r_au: Distance from star in AU
M_star: Stellar mass in Solar Masses
grain_density: kg/m^3 (e.g., 3000 for silicates)
"""
if self.verbose: print('SOLVE DUST SETTLE')
r_m = r_au*au
omega = np.sqrt(G*(M_star*M_sun)/r_m**3)
v_th = np.sqrt((8*kb*self.Tg)/(np.pi*mu*amu))
A, RHO = np.meshgrid(self.asize,self.nH*mu*amu)
VTH, _ = np.meshgrid(v_th, self.asize, indexing='ij')
stokes = (grain_density*A)/(RHO*VTH)*omega
alpha = self.dust_settle_alpha
h_ratio = np.sqrt(alpha/(alpha+stokes))
v_sound = np.sqrt((kb*self.Tg)/(mu*amu))
Hg = v_sound/omega
Hg_2d = np.repeat(Hg[:,np.newaxis], self.NSIZE, axis=1)
Z, _ = np.meshgrid(self.z, self.asize, indexing='ij')
settling_factor = np.exp(-(Z**2)/(2*(Hg_2d*h_ratio)**2))
f_mid = (1-self.apow)/(self.amax**(1-self.apow)-self.amin**(1-self.apow))*self.asize**(-self.apow)
density_drop = np.sum(f_mid*settling_factor*self.da, axis=1)
mask = density_drop > 1e-100
self.nd = self.nd*density_drop
new_fsize_unnorm = f_mid*settling_factor
self.fsize = np.zeros_like(new_fsize_unnorm)
for i in range(self.NGRID):
if mask[i]:
self.fsize[i, :] = new_fsize_unnorm[i, :]/density_drop[i]
else:
self.fsize[i, :] = f_mid
[docs]
def init_gas(self):
if self.verbose: print('INIT GAS')
self.rules = ht.read_line_selection(self.LineSelection)
self.mol_data = [ht.read_hitran_from_rules(rule) for rule in self.rules]
self.vg = np.zeros((self.NGRID,len(self.rules)))
for i in range(len(self.rules)):
self.vg[:,i] = np.sqrt(self.vturb**2+2.0*kb*self.Tg/(self.rules[i].mass*amu))
for i in range(len(self.rules)):
self.mol_data[i]['nu_Hz'] = (c/(np.array(self.mol_data[i]['lambda'])*um))
[docs]
def calculate_source(self, igrid):
N_nu = len(self.nu_grid)
nu_axis = self.nu_grid.copy()
is_reversed = False
if nu_axis[0]>nu_axis[-1]:
nu_axis = nu_axis[::-1]
is_reversed = True
numerator = np.zeros_like(nu_axis)
denominator = np.zeros_like(nu_axis)
iTg = self.Tg[igrid]
iTd = self.Td[igrid]
kappa_dust = self.ab_coeff_dust[igrid, :].copy()
if is_reversed:
kappa_dust = kappa_dust[::-1]
numerator += rs.planck(iTd, nu_axis)*kappa_dust
denominator += kappa_dust
half_width_factor = 10.0
for imol in range(len(self.rules)):
ing = self.ng[imol, igrid]
mol = self.mol_data[imol]
ivg = self.vg[igrid, imol]
inu0 = np.array(mol['nu_Hz'])
iQT = hp.partitionSum(QT_mol_id[self.rules[imol].name.split('_')[0]], self.rules[imol].iso, iTg)
ipop_u = rs.boltzmann_distribution(np.array(mol['E_u']), np.array(mol['g_u']), iTg, iQT)
ipop_l = rs.boltzmann_distribution(np.array(mol['E_l']), np.array(mol['g_l']), iTg, iQT)
kappa_g_base = (np.array(mol['A'])*(c/inu0)**3/(8.0*np.pi**1.5*ivg)*ing*(np.array(mol['g_u'])/np.array(mol['g_l'])*ipop_l-ipop_u))
j_g_base = h*c*ing*ipop_u*np.array(mol['A'])/(4*np.pi**1.5*ivg)
sigma = ivg*inu0/c
for i in range(len(inu0)):
center = inu0[i]
s = sigma[i]
width = half_width_factor*s
idx_l = np.searchsorted(nu_axis, center-width)
idx_r = np.searchsorted(nu_axis, center+width)
if idx_l >= idx_r: continue
prof = rs.profile_function(nu_axis[idx_l:idx_r], center, s)
numerator[idx_l:idx_r] += j_g_base[i]*prof
denominator[idx_l:idx_r] += kappa_g_base[i]*prof
source_work = np.divide(numerator, denominator, out=np.zeros_like(numerator), where=denominator!=0)
tau_work = denominator*self.dz[igrid]
if is_reversed:
self.total_source_function[igrid, :] = source_work[::-1]
self.tau_grid[igrid, :] = tau_work[::-1]
else:
self.total_source_function[igrid, :] = source_work
self.tau_grid[igrid, :] = tau_work
[docs]
def calculate_dust_source(self, igrid):
"""Calculates source function and tau for dust only."""
kappa_d = self.ab_coeff_dust[igrid, :]
self.dust_source_function[igrid, :] = rs.planck(self.Td[igrid], self.nu_grid)
self.tau_dust_grid[igrid,:] = kappa_d*self.dz[igrid]
[docs]
def calculate_spectrum(self, I_background=None):
"""
Calculates the emergent intensity I_nu from the 1D slab.
I_background: Optional array of size (NLAMBDA) for radiation
entering from the bottom (e.g., stellar/disk midplane).
"""
if self.verbose: print('CALCULATE SPECTRUM')
if I_background is None:
I_nu = np.zeros(self.NLAMBDA)
else:
I_nu = I_background
for i in range(self.NGRID):
S_nu = self.total_source_function[i, :]
d_tau = self.tau_grid[i, :]
transmission = np.exp(-d_tau)
I_nu = I_nu*transmission+S_nu*(1.0-transmission)
return I_nu
[docs]
def calculate_dust_continuum(self,I_background=None):
if self.verbose: print('CALCULATE DUST CONTINUUM')
if I_background is None:
I_nu = np.zeros(self.NLAMBDA)
else:
I_nu = I_background
for i in range(self.NGRID):
S_nu = self.dust_source_function[i, :]
d_tau = self.tau_dust_grid[i, :]
d_tau = np.clip(d_tau, 0, 500)
transmission = np.exp(-d_tau)
I_nu = I_nu*transmission+S_nu*(1.0-transmission)
return I_nu
[docs]
def run_model(self):
if self.verbose: print('RUNNING MODEL-\n')
self.unit_conversions()
self.init_grid()
self.init_gas()
self.read_Qmatrix()
self.init_dust()
for i in tqdm(range(self.NGRID), desc="Calculating source function:"):
self.calculate_source(i)
self.calculate_dust_source(i)
self.flux = self.calculate_spectrum()
self.continuum = self.calculate_dust_continuum()
[docs]
def convolve(self,R=1,lambda_0=1,lambda_n=1):
if self.verbose: print('CONVOLVE SPECTRUM')
if lambda_0==lambda_n:
lambda_0=np.amin(self.wavelength_grid)*1e6
lambda_n=np.amax(self.wavelength_grid)*1e6
lambda_0=lambda_0*10**-0.05
lambda_n=lambda_n*10**0.05
mask=(self.wavelength_grid<lambda_n*1e-6)&(self.wavelength_grid>lambda_0*1e-6)
FWHM=self.R_grid/R/2.355
g=Gaussian1DKernel(stddev=FWHM,factor=7)
self.convFlux=apy_convolve(self.flux[mask],g)
self.convContFlux=apy_convolve(self.continuum[mask],g)
self.convWavelength=self.wavelength_grid[mask]*1e6
self.convR=R
[docs]
def convolve_custom(self,values,R=1,lambda_0=1,lambda_n=1):
if self.verbose: print('CONVOLVE CUSTOM')
if lambda_0==lambda_n:
lambda_0=np.amin(self.wavelength_grid)*1e6
lambda_n=np.amax(self.wavelength_grid)*1e6
lambda_0=lambda_0*10**-0.05
lambda_n=lambda_n*10**0.05
mask=(self.wavelength_grid<lambda_n*1e-6)&(self.wavelength_grid>lambda_0*1e-6)
FWHM=self.R_grid/R/2.355
g=Gaussian1DKernel(stddev=FWHM,factor=7)
return(apy_convolve(values[mask],g))
[docs]
def from_prodimo(self,model_path,r,species=[],LineSelectionFile='LineSelection.in',MieFile='',lambda_0=4.9,lambda_n=28,R_grid=1e5):
if self.verbose: print('FROM PRODIMO')
model_2d = pread.read_prodimo(model_path)
ix = np.argmin(np.abs(model_2d.x[:,0]-r))
if isinstance(species,list):
isps = [model_2d.spnames[sp] for sp in species]
elif isinstance(species,str):
isps = [model_2d.spnames[species]]
else:
raise ValueError("species should be either list of strings, or a string")
if MieFile != '':
self.Qmatrix_file = MieFile
else:
self.Qmatrix_file = model_path+'/Mie.fits.gz'
self.path=model_path
self.lambda_0=lambda_0
self.lambda_n=lambda_n
self.R_grid=R_grid
self.amin=model_2d.params['amin']*1e4
self.amax=model_2d.params['amax']*1e4
self.apow=model_2d.params['apow']
self.NSIZE=50
self.dust_to_gas=model_2d.params['dust_to_gas_c']
self.dust_settle=True
self.dust_settle_alpha=model_2d.params['a_settle']
self.grain_density=3 #g/cm3
self.NGRID=model_2d.params['nzz']
self.M_star=model_2d.star.mass
self.r_au=r*1.0
self.z=model_2d.z[ix,:].copy()
self.Td=model_2d.td[ix,:]
self.nH=model_2d.nHtot[ix,:].copy()
self.rhog=model_2d.rhog[ix,:].copy()
self.ng=np.array([model_2d.nmol[ix,:,isp] for isp in isps])
self.nd=model_2d.nd[ix,:]
self.Tg=np.clip(model_2d.tg[ix,:].copy(), None, 5000)
self.vturb=np.ones_like(self.Tg)*model_2d.params['v_turb']*1e-3
self.LineSelection = LineSelectionFile