"""
.. module:: run_slab
:synopsis: Python version of slab models
.. moduleauthor:: A. M. Arabhavi
"""
# # Imports
import numpy as np
import pandas as pd
import prodimopy.hitran as ht
import prodimopy.plot_slab as ps
import prodimopy.read_slab as rs
from astropy.io import fits
import pickle
from astropy.convolution import Gaussian1DKernel
from astropy.convolution import convolve as apy_convolve
from scipy.interpolate import RegularGridInterpolator
import hapi as hp
from scipy.interpolate import interp1d
import prodimopy.read as pread
from tqdm.auto import tqdm
import os
from concurrent.futures import ProcessPoolExecutor
from functools import partial
# # Constant definitions
from scipy.constants import c,h,G
from scipy.constants import k as kb
from scipy.constants import astronomical_unit as au
from scipy.constants import parsec as pc
from scipy.constants import atomic_mass as amu
from astropy.constants import M_sun
from astropy import units as u
a = 1*u.Jy
percm2_to_perm2 = 1e4
gpercm3_to_kgperm3 = 1e3
cm2perg_to_m2perkg = 1e-1
percm3_to_perm3 = 1e6
cm_to_m = 1e-2
um_to_m = 1e-6
kmps = rs.kmps
um = ((1*u.micron).to(u.m)).value
cm_K = c*h/kb*1e2
jy = (1*u.Jy).to(u.kg/u.s**2).value
mu = 2.3
M_sun = M_sun.si.value
# # Function definitions
[docs]
def read_partition_sums(custom_partition_sum_file):
try:
custom_partition_sums = np.loadtxt(custom_partition_sum_file)
except:
try:
custom_partition_sums = np.loadtxt(custom_partition_sum_file,delimiter=',')
except:
raise ValueError(f'Cannot read {custom_partition_sum_file}')
return(interp1d(custom_partition_sums[:,0],custom_partition_sums[:,1]))
QT_mol_id = {name: mol_id for mol_id, name in set([(i[1][0],i[1][-1]) for i in list(hp.ISO_ID.items())])}
[docs]
def run_0D_slab(Ng,Tg,vturb,molecule,mol_mass,HITRANfile,cdn_scale_fac=[1],custom_partition_sum_file='',isotopolog=[1],wave_mol=[4,30],wave_spec=[4.9,28],R_grid=1e5,output='both',output_filename='default.out',mode='both',convolve_R=None,verbose=True):
'''
This is a python implementation of 0D slab models, based on the FORTRAN version of 0D slab models in ProDiMo (https://prodimo.iwf.oeaw.ac.at/)
This runs models with/without line overlap. More robust and efficient code is available on Fortran, please check the above website (especially for running large grids of models).
The Fortran code also supports OMP and supports including multiple species.
Ng : float
Gas column density (cm-2)
Tg: float
Gas temperature (K)
vturb : float
Turbulent velocity of the gas (km/s)
molecule : string or list of strings
Molecule name, e.g. 'CO2' or ['CO2','C2H2','HCN']
mol_mass : float or list of floats
Molecular mass(es) in atomic mass units (amu)
HITRANfile : string or listt of strings
Path(s) to the HITRAN '.par' file(s) containing the line data downloaded from HITRAN
cdn_scale_fac : list of floats
Column density of the molecules are scaled by these factors (useful for including isotopologue ratios)
custom_partition_sum_file : string
Path pointing to file containing custom partition sum (2 columns: temperature, partition sums)
isotopolog : list
List containing isotopologue numbers (HITRAN format)
wave_mol : list
Wavelength limits in microns to select the lines from the HITRAN file
wave_spec : list
Wavelength limits in microns for calculating the spectra
R_grid : float
High resolution grid on which the spectra has to be calculated initially
Recommend minimum of 1e5
output: string
'file' to write output to file, 'return' to return a slab type, 'both' for both
output_filename: string
Filename(+path) for the output file
mode : string
'overlap' or 'line_by_line' or 'both'
convolve_R : float
Specrtal resolving power to which the spectra has to be convolved (3500 for JWST/MIRI)
verbose : boolean
Whether or not to print progress log
'''
if not isinstance(output_filename, str):
raise TypeError("output_filename should be a string")
if isinstance(molecule,str):
molecule = [molecule]
Nspecies = 1
Nspecies = len(molecule)
if not (isinstance(mol_mass,list) | isinstance(mol_mass,np.ndarray)):
mol_mass = [mol_mass]
if not (isinstance(cdn_scale_fac,list) | isinstance(cdn_scale_fac,np.ndarray)):
cdn_scale_fac = [cdn_scale_fac]
if not (isinstance(isotopolog,list) | isinstance(isotopolog,np.ndarray)):
isotopolog = [isotopolog]
if isinstance(HITRANfile,str):
HITRANfile = [HITRANfile]
if isinstance(custom_partition_sum_file,str):
custom_partition_sum_file = [custom_partition_sum_file]
if len(custom_partition_sum_file) != Nspecies:
custom_partition_sum_file *= Nspecies
if len(mol_mass)!=Nspecies:
raise ValueError(f'Number of molecules ({Nspecies}) and number of molecular masses ({len(mol_mass)}) supplied are not same!')
if len(HITRANfile)!=Nspecies:
raise ValueError(f'Number of molecules ({Nspecies}) and number of HITRAN filepaths ({len(HITRANfile)}) supplied are not same!')
if len(isotopolog)!=Nspecies:
raise ValueError(f'Number of molecules ({Nspecies}) and number of HITRAN isotopologue IDs ({len(isotopolog)}) supplied are not same!')
if len(cdn_scale_fac)!=Nspecies:
raise ValueError(f'Number of molecules ({Nspecies}) and number of column density scaling factors ({len(cdn_scale_fac)}) supplied are not same!')
nu_axis = rs.generate_grid(R=R_grid,lambda_0=wave_spec[0],lambda_n=wave_spec[1],sampling=1)*1e9
N_nu = len(nu_axis)
W_grid = c / nu_axis / um
nu_grid = nu_axis.reshape(1, 1, N_nu)
W_grid = W_grid.reshape(1, 1, N_nu)
Ng *= percm2_to_perm2
vturb *= kmps
N_sp_lines,mol_data,N_lines,custom_partition_sum_int = [],[],[],[]
nu_0,Aul,gu,gl,E_u,E_l = np.array([]),np.array([]),np.array([]),np.array([]),np.array([]),np.array([])
for isp in range(Nspecies):
mol_data.append(ht.read_hitran(HITRANfile[isp],molecule[isp],[isotopolog[isp]],wave_mol[0],wave_mol[1]))
N_sp_lines.append(len(mol_data[isp]))
lambda_0 = np.array(mol_data[isp]['lambda']) * um
nu_0 = np.concatenate((nu_0,(c / lambda_0).reshape(N_sp_lines[isp])))
Aul = np.concatenate((Aul,np.array(mol_data[isp]['A']).reshape(N_sp_lines[isp])))
gu = np.concatenate((gu,np.array(mol_data[isp]['g_u']).reshape(N_sp_lines[isp])))
gl = np.concatenate((gl,np.array(mol_data[isp]['g_l']).reshape(N_sp_lines[isp])))
E_u = np.concatenate((E_u,np.array(mol_data[isp]['E_u']).reshape(N_sp_lines[isp])))
E_l = np.concatenate((E_l,np.array(mol_data[isp]['E_l']).reshape(N_sp_lines[isp])))
custom_partition_sum_int.append(None)
if not(isinstance(custom_partition_sum_file[isp],str) or custom_partition_sum_file[isp] == None):
raise TypeError("Expected 'string' type for custom_partition_sum_file")
if custom_partition_sum_file[isp] == None:
custom_partition_sum_file[isp] = ''
if custom_partition_sum_file[isp] != '':
custom_partition_sum_int[isp] = read_partition_sums(custom_partition_sum_file[isp])
I_nu_line, tau_ret, I_nu, tau_lbl, pop_l, pop_u = run_0D_point(Ng,Tg,vturb,molecule,mol_mass,cdn_scale_fac,N_sp_lines,nu_0,Aul,E_u,E_l,gu,gl,isotopolog,nu_axis,mode,custom_partition_sum_int)
slab = convert_to_slab_format(nu_grid,I_nu_line,tau_ret,I_nu,tau_lbl,R_grid,Ng,Tg,cdn_scale_fac,vturb,pop_l,pop_u,mol_data,molecule,N_sp_lines,mode)
if output in ['file', 'both']:
if mode in ['overlap','both']:
slab[0].write_to_file(output_filename, 'overlap', verbose)
if mode in ['line_by_line','both']:
slab.write_to_file(output_filename, 'line_by_line', verbose)
if output in ['return', 'both']:
if not (convolve_R is None):
slab.convolve(R=convolve_R,lambda_0=wave_spec[0],lambda_n=wave_spec[1],overlap= True if mode in ['overlap','both'] else False,verbose=verbose)
return(slab)
return()
def _worker_slab(indexed_params, molecule, mol_mass, cdn_scale_fac, N_sp_lines, nu_0, Aul, E_u, E_l, gu, gl,
isotopolog, nu_axis, mode, custom_partition_sum_int,
nu_grid, R_grid, mol_data, output, output_directory, base_filename, verbose, convolve_R, wave_spec):
idx, (Ng_val, Tg_val, vturb_val) = indexed_params
current_filename = f"{base_filename}_{idx+1:04d}"
current_filename = os.path.join(output_directory,current_filename)
I_nu_line, tau_ret, I_nu, tau_lbl, pop_l, pop_u = run_0D_point(Ng_val, Tg_val, vturb_val, molecule, mol_mass, cdn_scale_fac, N_sp_lines, nu_0, Aul, E_u, E_l, gu, gl, isotopolog, nu_axis, mode, custom_partition_sum_int)
slab = convert_to_slab_format(nu_grid, I_nu_line, tau_ret, I_nu, tau_lbl, R_grid, Ng_val, Tg_val, cdn_scale_fac, vturb_val, pop_l, pop_u, mol_data, molecule, N_sp_lines, mode)
if output in ['file', 'both']:
if mode in ['overlap','both']:
slab[0].write_to_file(current_filename, 'overlap', verbose)
if mode in ['line_by_line','both']:
slab.write_to_file(current_filename, 'line_by_line', verbose)
if output in ['return', 'both'] and convolve_R is not None:
slab.convolve(R=convolve_R, lambda_0=wave_spec[0], lambda_n=wave_spec[1], overlap=True if mode in ['overlap', 'both'] else False, verbose=verbose)
return slab if output in ['return', 'both'] else None
[docs]
def run_0D_slab_grid(Ng,Tg,vturb,molecule,mol_mass,HITRANfile,cdn_scale_fac=[1],custom_partition_sum_file='',isotopolog=[1],wave_mol=[4,30],wave_spec=[4.9,28],R_grid=1e5,output='both',output_directory='.',output_filename_prefix='default',mode='both',convolve_R=None,write_index_file=True,number_of_cores= 1,verbose=True):
'''
This is a python implementation of 0D slab models, based on the FORTRAN version of 0D slab models in ProDiMo (https://prodimo.iwf.oeaw.ac.at/)
This runs models with/without line overlap. More robust and efficient code is available on Fortran, please check the above website (especially for running large grids of models).
The Fortran code also supports OMP and supports including multiple species.
Ng : list or numpy.ndarray
Gas column density (cm-2)
Tg: list or numpy.ndarray
Gas temperature (K)
vturb : list or numpy.ndarray
Turbulent velocity of the gas (km/s)
molecule : string or list of strings
Molecule name, e.g. 'CO2' or ['CO2','C2H2','HCN']
mol_mass : float or list of floats
Molecular mass(es) in atomic mass units (amu)
HITRANfile : string or listt of strings
Path(s) to the HITRAN '.par' file(s) containing the line data downloaded from HITRAN
cdn_scale_fac : list of floats
Column density of the molecules are scaled by these factors (useful for including isotopologue ratios)
custom_partition_sum_file : string
Path pointing to file containing custom partition sum (2 columns: temperature, partition sums)
isotopolog : list
List containing isotopologue numbers (HITRAN format)
wave_mol : list
Wavelength limits in microns to select the lines from the HITRAN file
wave_spec : list
Wavelength limits in microns for calculating the spectra
R_grid : float
High resolution grid on which the spectra has to be calculated initially
Recommend minimum of 1e5
output : string
'file' to write output to file, 'return' to return a slab type, 'both' for both
output_directory : string
Directory path where the output files should be written
output_filename_prefix : string
Filename prefix for the output file without file extension, a number will be appended to this corresponding to each slab model grid point
mode : string
'overlap' or 'line_by_line' or 'both'
convolve_R : float
Specrtal resolving power to which the spectra has to be convolved (3500 for JWST/MIRI)
write_index_file : string
Write an index file containing the index number of each file and the corresponding Ng, Tg, vturb settings for that model
number_of_cores : int or string
Number of cores to use to parallelize the grid run. A number or 'max', 'half', or 'quarter' can be specified
verbose : boolean
Whether or not to print progress log
'''
if not isinstance(output_filename_prefix, str):
raise TypeError("output_filename should be a string")
parameter_grid = list(enumerate(zip(Ng, Tg, vturb)))
if write_index_file:
index_data = np.array([np.arange(len(Ng)) + 1, Ng, Tg, vturb]).T
index_filename = os.path.join(output_directory,f"{output_filename_prefix}_index.txt")
np.savetxt(index_filename, index_data, fmt=['%05d', '%.4e', '%.2f', '%.2f'], header='# Index | Ng [cm-2] | Tg [K] | vturb [km/s]',delimiter=' ')
if isinstance(molecule,str):
molecule = [molecule]
Nspecies = 1
Nspecies = len(molecule)
if not (isinstance(mol_mass,list) | isinstance(mol_mass,np.ndarray)):
mol_mass = [mol_mass]
if not (isinstance(cdn_scale_fac,list) | isinstance(cdn_scale_fac,np.ndarray)):
cdn_scale_fac = [cdn_scale_fac]
if not (isinstance(isotopolog,list) | isinstance(isotopolog,np.ndarray)):
isotopolog = [isotopolog]
if isinstance(HITRANfile,str):
HITRANfile = [HITRANfile]
if isinstance(custom_partition_sum_file,str):
custom_partition_sum_file = [custom_partition_sum_file]
if len(custom_partition_sum_file) != Nspecies:
custom_partition_sum_file *= Nspecies
if len(mol_mass)!=Nspecies:
raise ValueError(f'Number of molecules ({Nspecies}) and number of molecular masses ({len(mol_mass)}) supplied are not same!')
if len(HITRANfile)!=Nspecies:
raise ValueError(f'Number of molecules ({Nspecies}) and number of HITRAN filepaths ({len(HITRANfile)}) supplied are not same!')
if len(isotopolog)!=Nspecies:
raise ValueError(f'Number of molecules ({Nspecies}) and number of HITRAN isotopologue IDs ({len(isotopolog)}) supplied are not same!')
if len(cdn_scale_fac)!=Nspecies:
raise ValueError(f'Number of molecules ({Nspecies}) and number of column density scaling factors ({len(cdn_scale_fac)}) supplied are not same!')
nu_axis = rs.generate_grid(R=R_grid,lambda_0=wave_spec[0],lambda_n=wave_spec[1],sampling=1)*1e9
N_nu = len(nu_axis)
W_grid = c / nu_axis / um
nu_grid = nu_axis.reshape(1, 1, N_nu)
W_grid = W_grid.reshape(1, 1, N_nu)
Ng = np.array(Ng)*percm2_to_perm2
vturb = np.array(vturb)*kmps
N_sp_lines,mol_data,N_lines,custom_partition_sum_int = [],[],[],[]
nu_0,Aul,gu,gl,E_u,E_l = np.array([]),np.array([]),np.array([]),np.array([]),np.array([]),np.array([])
for isp in range(Nspecies):
mol_data.append(ht.read_hitran(HITRANfile[isp],molecule[isp],[isotopolog[isp]],wave_mol[0],wave_mol[1]))
N_sp_lines.append(len(mol_data[isp]))
lambda_0 = np.array(mol_data[isp]['lambda']) * um
nu_0 = np.concatenate((nu_0,(c / lambda_0).reshape(N_sp_lines[isp])))
Aul = np.concatenate((Aul,np.array(mol_data[isp]['A']).reshape(N_sp_lines[isp])))
gu = np.concatenate((gu,np.array(mol_data[isp]['g_u']).reshape(N_sp_lines[isp])))
gl = np.concatenate((gl,np.array(mol_data[isp]['g_l']).reshape(N_sp_lines[isp])))
E_u = np.concatenate((E_u,np.array(mol_data[isp]['E_u']).reshape(N_sp_lines[isp])))
E_l = np.concatenate((E_l,np.array(mol_data[isp]['E_l']).reshape(N_sp_lines[isp])))
custom_partition_sum_int.append(None)
if not(isinstance(custom_partition_sum_file[isp],str) or custom_partition_sum_file[isp] == None):
raise TypeError("Expected 'string' type for custom_partition_sum_file")
if custom_partition_sum_file[isp] == None:
custom_partition_sum_file[isp] = ''
if custom_partition_sum_file[isp] != '':
custom_partition_sum_int[isp] = read_partition_sums(custom_partition_sum_file[isp])
func = partial(_worker_slab,
molecule=molecule, mol_mass=mol_mass, cdn_scale_fac=cdn_scale_fac, N_sp_lines=N_sp_lines, nu_0=nu_0, Aul=Aul,
E_u=E_u, E_l=E_l, gu=gu, gl=gl, isotopolog=isotopolog,
nu_axis=nu_axis, mode=mode,
custom_partition_sum_int=custom_partition_sum_int,
nu_grid=nu_grid, R_grid=R_grid, mol_data=mol_data,
output=output, output_directory=output_directory, base_filename=output_filename_prefix,
verbose=verbose, convolve_R=convolve_R, wave_spec=wave_spec)
if isinstance(number_of_cores, str):
total_cores = os.cpu_count() or 1
if number_of_cores == 'max':
n_workers = total_cores
elif number_of_cores == 'half':
n_workers = max(1, total_cores // 2)
elif number_of_cores == 'quarter':
n_workers = max(1, total_cores // 4)
else:
n_workers = 1
else:
n_workers = number_of_cores
with ProcessPoolExecutor(max_workers=n_workers) as executor:
results = list(executor.map(func, parameter_grid))
if output in ['return', 'both']:
return results
[docs]
def run_0D_point(Ng,Tg,vturb,molecule,mol_mass,cdn_scale_fac,N_sp_lines,nu0,Aul,E_u,E_l,gu,gl,isotopolog,nu_grid,mode,custom_partition_sum_int=[None]):
'''
This runs a RT on a single point with/without line overlap.
All variables use SI unit
Ng : float
Gas column density (m-2)
Tg: float
Gas temperature (K)
vturb : float
Turbulent velocity of the gas (m/s)
molecule : string or list of strings
Molecule name, e.g. 'CO2' or ['CO2','C2H2','HCN']
mol_mass : float or list of floats
Molecular mass(es) in atomic mass units (amu)
cdn_scale_fac : list
Column density scaling factor
N_sp_lines : list
List of number of lines of each molecule
nu0 : numpy array
Line frequencies (Hz)
Aul : numpy array
Einstein A coefficient
E_u : numpy array
Upper energy level
E_l : numpy array
Lower energy level
gu : numpy array
Statistical weight / degeneracy of upper level
gl : numpy array
Statistical weight / degeneracy of lower level
isotopolog : list
List containing isotopologue numbers (HITRAN format)
nu_grid : numpy array
Array containing spectral grid for the final spectra
mode : string
'line_by_line' or 'overlap' or 'both'
custom_partition_sum_file : string
Path pointing to file containing custom partition sum (2 columns: temperature, partition sums)
'''
N_nu = len(nu_grid)
Nspecies = len(molecule)
N_lines = int(np.sum(N_sp_lines))
Ng = np.ones_like(nu0)*Ng
Tg = np.ones_like(nu0)*Tg
vg = np.zeros_like(nu0)
pop_u = np.zeros_like(nu0)
pop_l = np.zeros_like(nu0)
sigma = np.zeros_like(nu0)
Tauuu = np.zeros_like(nu0)
I_nu = np.zeros_like(nu0)
idx_start = 0
for isp in range(len(N_sp_lines)):
Ng[idx_start:idx_start+N_sp_lines[isp]] *= cdn_scale_fac[isp]
vg[idx_start:idx_start+N_sp_lines[isp]] = np.sqrt(vturb**2 + 2.0 * kb * Tg[idx_start:idx_start+N_sp_lines[isp]] / (mol_mass[isp] * amu))
if not custom_partition_sum_int[isp]==None:
QT = custom_partition_sum_int[isp](Tg[0])
else:
QT = hp.partitionSum(QT_mol_id[molecule[isp]],isotopolog[isp],Tg[0])
pop_u[idx_start:idx_start+N_sp_lines[isp]] = rs.boltzmann_distribution(E_u[idx_start:idx_start+N_sp_lines[isp]], gu[idx_start:idx_start+N_sp_lines[isp]], Tg[idx_start:idx_start+N_sp_lines[isp]], QT)
pop_l[idx_start:idx_start+N_sp_lines[isp]] = rs.boltzmann_distribution(E_l[idx_start:idx_start+N_sp_lines[isp]], gl[idx_start:idx_start+N_sp_lines[isp]], Tg[idx_start:idx_start+N_sp_lines[isp]], QT)
idx_start += N_sp_lines[isp]
Tauuu = (Aul*(c/nu0)**3/(8.0*np.pi**1.5*vg)*Ng*(gu/gl*pop_l-pop_u))
sigma = vg*nu0/c
tau_ret = np.zeros(N_nu)
I_nu_line = np.zeros(N_nu)
nu_axis = nu_grid
if mode in ['overlap', 'both']:
if nu_grid[0] > nu_grid[-1]:
nu_axis = nu_grid[::-1]
reverse_output = True
else:
reverse_output = False
half_width_factor = 10.0 # sigma_thresh/2 (20/2)
for i in range(N_lines):
center = nu0[i]
s = sigma[i]
width = half_width_factor * s
left = np.searchsorted(nu_axis, center - width)
right = np.searchsorted(nu_axis, center + width)
if left >= right:
continue
nu_slice = nu_axis[left:right]
delta = nu_slice - center
prof = rs.profile_function(nu_slice,center,s)
# np.exp(-(delta**2) / s**2)
tau_ret[left:right] += Tauuu[i]*prof
I_nu_line = rs.planck(Tg[0], nu_axis)*(1.0 - np.exp(-tau_ret))
if reverse_output:
tau_ret = tau_ret[::-1]
I_nu_line = I_nu_line[::-1]
if mode in ['line_by_line', 'both']:
I_nu = (rs.planck(Tg,nu0)*growth_function(Tauuu)*vg*nu0/c)
return (I_nu_line,tau_ret,I_nu,Tauuu,pop_l,pop_u)
[docs]
def run_1D_radial_slab(Ng,Tg,R,d,vturb,R_grid,output_filename,molecule,mol_mass,HITRANfile,cdn_scale_fac=[1],custom_partition_sum_file='',width='area_given',Rin_limit=0,Rout_limit=1e99,isotopolog=[1],wave_mol=[4,30],wave_spec=[4.9,28],verbose=False):
'''
This is a python implementation of 1D radial slab models, based on the FORTRAN version of 1D slab models in ProDiMo (https://prodimo.iwf.oeaw.ac.at/)
This runs models with line overlap. More robust and efficient code (including vertical slab model) is available in Fortran, please check the above website (especially for running large grids of models).
The Fortran code also supports OMP and supports including multiple species.
Ng : numpy array or list
Gas column density (cm-2)
Tg : numpy array or list
Gas temperature (K)
R : numpy array or list
Radius (AU)
d : float
Distance of the disk (parsec)
vturb : float
Turbulent velocity of the gas (km/s)
R_grid : float
High resolution grid on which the spectra has to be calculated initially
Recommend minimum of 1e5
output_filename : string
Filename(+path) for the fits output file
Use the extension '.fits.gz'
molecule : string
Molecule name, e.g. 'CO2'
molecule : string or list of strings
Molecule name, e.g. 'CO2' or ['CO2','C2H2','HCN']
mol_mass : float or list of floats
Molecular mass(es) in atomic mass units (amu)
HITRANfile : string or listt of strings
Path(s) to the HITRAN '.par' file(s) containing the line data downloaded from HITRAN
cdn_scale_fac : list of floats
Column density of the molecules are scaled by these factors (useful for including isotopologue ratios)
custom_partition_sum_file : string
Path pointing to file containing custom partition sum (2 columns: temperature, partition sums)
width : string or list or float
'area_given' - the provided radii array is used as the emitting area equivalent radius
'infer' - Assumes the provided radius as radial distance from the star and calculates the width of individual rings of slab grids
list - List containing emitting area equivalent radii
float - Constant emitting area equivalent radius, used for all grid points
Rin_limit : float
Inner radius limit of the disk (AU)
Rout_limit : float
Outer radius limit of the disk (AU)
isotopolog : list
List containing isotopologue numbers (HITRAN format)
wave_mol : list
Wavelength limits in microns to select the lines from the HITRAN file
wave_spec : list
Wavelength limits in microns for calculating the spectra
'''
if isinstance(output_filename,str):
tempname,tempext = os.path.splitext(output_filename)
temp2name,temp2ext = os.path.splitext(tempname)
if temp2ext+tempext == '.fits.gz':
output_filename = temp2name
else:
raise TypeError('output_filename should be a string')
if isinstance(molecule,str):
molecule = [molecule]
Nspecies = 1
Nspecies = len(molecule)
if not (isinstance(mol_mass,list) | isinstance(mol_mass,np.ndarray)):
mol_mass = [mol_mass]
if not (isinstance(cdn_scale_fac,list) | isinstance(cdn_scale_fac,np.ndarray)):
cdn_scale_fac = [cdn_scale_fac]
if not (isinstance(isotopolog,list) | isinstance(isotopolog,np.ndarray)):
isotopolog = [isotopolog]
if isinstance(HITRANfile,str):
HITRANfile = [HITRANfile]
if isinstance(custom_partition_sum_file,str):
custom_partition_sum_file = [custom_partition_sum_file]
if len(custom_partition_sum_file) != Nspecies:
custom_partition_sum_file *= Nspecies
if len(mol_mass)!=Nspecies:
raise ValueError(f'Number of molecules ({Nspecies}) and number of molecular masses ({len(mol_mass)}) supplied are not same!')
if len(HITRANfile)!=Nspecies:
raise ValueError(f'Number of molecules ({Nspecies}) and number of HITRAN filepaths ({len(HITRANfile)}) supplied are not same!')
if len(isotopolog)!=Nspecies:
raise ValueError(f'Number of molecules ({Nspecies}) and number of HITRAN isotopologue IDs ({len(isotopolog)}) supplied are not same!')
if len(cdn_scale_fac)!=Nspecies:
raise ValueError(f'Number of molecules ({Nspecies}) and number of column density scaling factors ({len(cdn_scale_fac)}) supplied are not same!')
nu_axis = rs.generate_grid(R=R_grid,lambda_0=wave_spec[0],lambda_n=wave_spec[1],sampling=1)*1e9
N_nu = len(nu_axis)
W_grid = c / nu_axis / um
nu_grid = nu_axis.reshape(N_nu)
W_grid = W_grid.reshape(N_nu)
Ng = np.array(Ng)*percm2_to_perm2
vturb = np.array(vturb)*kmps
N_sp_lines,mol_data,N_lines,custom_partition_sum_int = [],[],[],[]
nu_0,Aul,gu,gl,E_u,E_l = np.array([]),np.array([]),np.array([]),np.array([]),np.array([]),np.array([])
for isp in range(Nspecies):
mol_data.append(ht.read_hitran(HITRANfile[isp],molecule[isp],[isotopolog[isp]],wave_mol[0],wave_mol[1]))
N_sp_lines.append(len(mol_data[isp]))
lambda_0 = np.array(mol_data[isp]['lambda']) * um
nu_0 = np.concatenate((nu_0,(c / lambda_0).reshape(N_sp_lines[isp])))
Aul = np.concatenate((Aul,np.array(mol_data[isp]['A']).reshape(N_sp_lines[isp])))
gu = np.concatenate((gu,np.array(mol_data[isp]['g_u']).reshape(N_sp_lines[isp])))
gl = np.concatenate((gl,np.array(mol_data[isp]['g_l']).reshape(N_sp_lines[isp])))
E_u = np.concatenate((E_u,np.array(mol_data[isp]['E_u']).reshape(N_sp_lines[isp])))
E_l = np.concatenate((E_l,np.array(mol_data[isp]['E_l']).reshape(N_sp_lines[isp])))
custom_partition_sum_int.append(None)
if not(isinstance(custom_partition_sum_file[isp],str) or custom_partition_sum_file[isp] == None):
raise TypeError("Expected 'string' type for custom_partition_sum_file")
if custom_partition_sum_file[isp] == None:
custom_partition_sum_file[isp] = ''
if custom_partition_sum_file[isp] != '':
custom_partition_sum_int[isp] = read_partition_sums(custom_partition_sum_file[isp])
R = np.array(R)*au
Rin_limit *= au
Rout_limit *= au
Rr = np.zeros_like(R)
if width == 'infer':
if len(R)>1:
if ((R[1]-R[0])/2>R[0]) or ((R[1]-R[0])/2>(R[0]-Rin_limit)) :
Rr[0] = (R[0]+(R[1]-R[0])/2)**2-Rin_limit**2
else:
Rr[0] = (R[0]+(R[1]-R[0])/2)**2-(R[0]-(R[1]-R[0])/2)**2
if ((R[-2]-R[-1])/2>(Rout_limit-R[-1])) :
Rr[-1] = Rout_limit**2-(R[-2]+(R[-1]-R[-2])/2)**2
else:
Rr[-1] = ((R[-1]-R[-2])/2+R[-1])**2-(R[-1]-(R[-1]-R[-2])/2)**2
if len(R)>2:
for i in range(len(Ng)-2):
Rr[i+1] = (R[i+1]+(R[i+2]-R[i+1])/2)**2-(R[i+1]-(R[i+1]-R[i])/2)**2
else:
Rr = R**2
elif width == 'area_given':
Rr = R**2
elif isinstance(width,float) or isinstance(width,int):
Rr+=width**2
else:
for i in range(len(Ng)):
Rr[i] = (R[i]+width[i]/2)**2-(R[i]-width[i]/2)**2
d *= pc
if isinstance(vturb,float) or isinstance(vturb,int): vturb = np.ones_like(Ng)*vturb
total_flux = np.zeros_like(nu_grid)
for i in range(len(Ng)):
I_nu_line,tau_ret,I_nu,tau_lbl,pop_l,pop_u = run_0D_point(Ng[i],Tg[i],vturb[i],molecule,mol_mass,cdn_scale_fac,N_sp_lines,nu_0,Aul,E_u,E_l,gu,gl,isotopolog,nu_grid,'overlap',custom_partition_sum_int)
total_flux += I_nu_line*np.pi*Rr[i]/d**2
slab = convert_to_slab_format(nu_grid,total_flux,tau_ret*0.0,I_nu*0.0,tau_lbl*0.0,R_grid,0,0,[0],0,0,0,0,molecule,N_sp_lines,'overlap')
slab.write_to_file(output_filename,'overlap',verbose=verbose)
return
[docs]
def growth_function(tau):
'''
This function returns the growth function, or the flux growth for the given line optical depth(s)
tau : numpy array or list
line optical depth
'''
tau = np.asanyarray(tau)
growth = np.zeros_like(tau)
cond2 = (tau >= 1e-13) & (tau < 10)
cond3 = (tau >= 10)
t2 = tau[cond2]
if t2.size > 0:
growth[cond2] = (7.98554359 * t2**0.9999470274) / (4.509393063 + 1.72466517 * t2**1.062901665)
t3 = tau[cond3]
if t3.size > 0:
growth[cond3] = 2.8859693 * (np.log10(t3)**0.50868585) + 0.40682335
return growth
[docs]
def get_dust_opacity_provider(dust_sizes, wavelengths, Q_abs_table):
"""
Factory that returns a pre-configured interpolation function.
"""
# Log-prep the grid
log_sizes = np.log10(dust_sizes)
log_waves = np.log10(wavelengths)
log_Q = np.log10(Q_abs_table + 1e-99)
# Initialize the engine
interp_engine = RegularGridInterpolator(
(log_sizes, log_waves), log_Q,
method='linear', bounds_error=False, fill_value=None
)
def get_Q(d_size, wave):
# Broadcast to ensure shapes match (handles scalar wave vs array size)
d_size, wave = np.broadcast_arrays(d_size, wave)
# Flatten for the interpolator engine
query_points = np.stack([np.log10(d_size + 1e-99),
np.log10(wave + 1e-99)], axis=-1)
return 10**interp_engine(query_points)
return get_Q
[docs]
class slab_model_1D:
"""
class:: slab_model_1D
Class to run 1D forward slab models
"""
def __init__(self):
self.path=None
self.lambda_0=None
self.lambda_n=None
self.NLAMBDA=None
self.R_grid=None
self.nu_grid=None
self.wavelength_grid=None
self.total_source_function=None
self.flux_dust_only = None
self.dust_source_function = None
self.tau_grid=None
self.tau_dust_grid=None
self.flux=None
self.convWavelength=None
self.convFlux=None
self.convR=None
self.convContFlux=None
self.continuum=None
self.amin=None
self.amax=None
self.apow=None
self.da=None
self.NSIZE=None
self.dust_to_gas=None
self.fsize=None
self.asize=None
self.Qabs=None
self.em_coeff_dust=None
self.ab_coeff_dust=None
self.nd=None
self.Td=None
self.dust_settle=None
self.dust_settle_alpha=None
self.grain_density=None #kg/m3
self.linedata=None
self.leveldata=None
self.LineSelection=None
self.rules=None
self.mol_data=None
self.NGRID=None
self.z=None
self.dz=None
self.r_au=None
self.M_star=None
self.ng=None
self.Tg=None
self.nH=None
self.rhog=None
self.Nspecies=None
self.em_coeff_gas=None
self.ab_coeff_gas=None
self.vturb=None
self.vg=None
self.Hg=None
self.Qmatrix_file=None
self.Qmatrix_amin=None
self.Qmatrix_amax=None
self.Qmatrix_asize=None
self.Qmatrix_NSIZE=None
self.Qmatrix_NLAMMAX=None
self.Qmatrix_NDUST=None
self.Qmatrix_lammin=9.12e-6 #cm
self.Qmatrix_lammax=1 #cm
self.Qmatrix_lam=None
self.Qmatrix_Qabs=None
self.verbose=True
[docs]
def unit_conversions(self):
self.z *= au
if not (self.nd is None):
self.nd *= percm3_to_perm3
self.ng *= percm3_to_perm3
self.nH *= percm3_to_perm3
self.vturb *= kmps
if not (self.rhog is None):
self.rhog *= gpercm3_to_kgperm3
if not (self.grain_density is None):
self.grain_density *= gpercm3_to_kgperm3
self.amin*=um_to_m
self.amax*=um_to_m
[docs]
def reset_units(self):
self.z /= au
if not (self.nd is None):
self.nd /= percm3_to_perm3
self.ng /= percm3_to_perm3
self.nH /= percm3_to_perm3
self.vturb /= kmps
if not (self.rhog is None):
self.rhog /= gpercm3_to_kgperm3
if not (self.grain_density is None):
self.grain_density /= gpercm3_to_kgperm3
self.amin/=um_to_m
self.amax/=um_to_m
[docs]
def init_grid(self):
if self.verbose: print('INIT GRID')
self.nu_grid=rs.generate_grid(R=self.R_grid,lambda_0=self.lambda_0,lambda_n=self.lambda_n,sampling=1)*1e9
self.wavelength_grid = c/self.nu_grid
self.NLAMBDA=len(self.nu_grid)
self.total_source_function=np.zeros((self.NGRID,self.NLAMBDA))
self.tau_grid=np.zeros((self.NGRID,self.NLAMBDA))
self.tau_dust_grid=np.zeros((self.NGRID,self.NLAMBDA))
self.dz = np.gradient(self.z)
self.dust_source_function = np.zeros((self.NGRID, self.NLAMBDA))
[docs]
def read_Qmatrix(self):
if self.verbose: print('READ QMATRIX')
if not(self.Qmatrix_file==None):
hdul=fits.open(self.Qmatrix_file)
self.Qmatrix_amin=hdul[0].header['AMIN']
self.Qmatrix_amax=hdul[0].header['AMAX']
self.Qmatrix_NSIZE=hdul[0].header['NAXIS5']
self.Qmatrix_NLAMMAX=hdul[0].header['NAXIS6']
self.Qmatrix_NDUST=hdul[0].header['NDUST']
# self.Qmatrix_lammin=91.2e-3#hdul[0].header['LAMMIN']
# self.Qmatrix_lammax=1e4#hdul[0].header['LAMMAX']
self.Qmatrix_Qabs=hdul[0].data[2,:,:,0,0,0,0].T
self.Qmatrix_amin*=cm_to_m
self.Qmatrix_amax*=cm_to_m
self.Qmatrix_lammin*=cm_to_m
self.Qmatrix_lammax*=cm_to_m
[docs]
def init_dust(self):
if self.verbose: print('INIT DUST')
self.Qmatrix_lam=np.logspace(np.log10(self.Qmatrix_lammin),np.log10(self.Qmatrix_lammax),self.Qmatrix_NLAMMAX)
self.Qmatrix_asize=np.logspace(np.log10(self.Qmatrix_amin),np.log10(self.Qmatrix_amax),self.Qmatrix_NSIZE)
self.asize=np.logspace(np.log10(self.amin),np.log10(self.amax),self.NSIZE)
self.da=np.zeros(self.NSIZE)
diff=0.5*np.diff(self.asize)
self.da[1:]+=diff
self.da[:-1]+=diff
temp_fsize=(1-self.apow)/(self.amax**(1-self.apow)-self.amin**(1-self.apow))*self.asize**(-self.apow)
integral = np.sum(temp_fsize * self.da)
temp_fsize /= integral
self.fsize=np.repeat(temp_fsize.reshape((self.NSIZE,1)),[self.NGRID],axis=1).T
if self.nd is None:
if self.rhog is None:
rho_gas=self.nH*mu*amu
else:
rho_gas=self.rhog
grain_masses=(4.0/3.0)*np.pi*self.asize**3*self.grain_density
mean_grain_mass=np.sum(grain_masses*temp_fsize*self.da)
self.nd = (rho_gas*self.dust_to_gas)/mean_grain_mass
if self.dust_settle:
self.solve_dust_settle(r_au=self.r_au, M_star=self.M_star, grain_density=self.grain_density)
get_Q = get_dust_opacity_provider(self.Qmatrix_asize, self.Qmatrix_lam, self.Qmatrix_Qabs)
sizes_2d, waves_2d = np.meshgrid(self.asize, self.wavelength_grid, indexing='ij')
q_grid = get_Q(sizes_2d.ravel(), waves_2d.ravel()).reshape(sizes_2d.shape)
self.ab_coeff_dust=np.zeros((self.NGRID,self.NLAMBDA))
for i in range(self.NGRID):
geometric_term = (self.fsize[i,:]*np.pi*self.asize**2*self.da)[:, np.newaxis]
self.ab_coeff_dust[i,:] = self.nd[i]*np.sum(geometric_term*q_grid, axis=0)
del q_grid,sizes_2d,waves_2d,geometric_term
[docs]
def solve_dust_settle(self, r_au=1.0, M_star=1.0, grain_density=3000):
"""
r_au: Distance from star in AU
M_star: Stellar mass in Solar Masses
grain_density: kg/m^3 (e.g., 3000 for silicates)
"""
if self.verbose: print('SOLVE DUST SETTLE')
r_m = r_au*au
omega = np.sqrt(G*(M_star*M_sun)/r_m**3)
v_th = np.sqrt((8*kb*self.Tg)/(np.pi*mu*amu))
A, RHO = np.meshgrid(self.asize,self.nH*mu*amu)
VTH, _ = np.meshgrid(v_th, self.asize, indexing='ij')
stokes = (grain_density*A)/(RHO*VTH)*omega
alpha = self.dust_settle_alpha
h_ratio = np.sqrt(alpha/(alpha+stokes))
v_sound = np.sqrt((kb*self.Tg)/(mu*amu))
Hg = v_sound/omega
Hg_2d = np.repeat(Hg[:,np.newaxis], self.NSIZE, axis=1)
Z, _ = np.meshgrid(self.z, self.asize, indexing='ij')
settling_factor = np.exp(-(Z**2)/(2*(Hg_2d*h_ratio)**2))
f_mid = (1-self.apow)/(self.amax**(1-self.apow)-self.amin**(1-self.apow))*self.asize**(-self.apow)
density_drop = np.sum(f_mid*settling_factor*self.da, axis=1)
mask = density_drop > 1e-100
self.nd = self.nd*density_drop
new_fsize_unnorm = f_mid*settling_factor
self.fsize = np.zeros_like(new_fsize_unnorm)
for i in range(self.NGRID):
if mask[i]:
self.fsize[i, :] = new_fsize_unnorm[i, :]/density_drop[i]
else:
self.fsize[i, :] = f_mid
[docs]
def init_gas(self):
if self.verbose: print('INIT GAS')
self.rules = ht.read_line_selection(self.LineSelection)
self.mol_data = [ht.read_hitran_from_rules(rule) for rule in self.rules]
self.vg = np.zeros((self.NGRID,len(self.rules)))
for i in range(len(self.rules)):
self.vg[:,i] = np.sqrt(self.vturb**2+2.0*kb*self.Tg/(self.rules[i].mass*amu))
for i in range(len(self.rules)):
self.mol_data[i]['nu_Hz'] = (c/(np.array(self.mol_data[i]['lambda'])*um))
[docs]
def calculate_source(self, igrid):
N_nu = len(self.nu_grid)
nu_axis = self.nu_grid.copy()
is_reversed = False
if nu_axis[0]>nu_axis[-1]:
nu_axis = nu_axis[::-1]
is_reversed = True
numerator = np.zeros_like(nu_axis)
denominator = np.zeros_like(nu_axis)
iTg = self.Tg[igrid]
iTd = self.Td[igrid]
kappa_dust = self.ab_coeff_dust[igrid, :].copy()
if is_reversed:
kappa_dust = kappa_dust[::-1]
numerator += rs.planck(iTd, nu_axis)*kappa_dust
denominator += kappa_dust
half_width_factor = 10.0
for imol in range(len(self.rules)):
ing = self.ng[imol, igrid]
mol = self.mol_data[imol]
ivg = self.vg[igrid, imol]
inu0 = np.array(mol['nu_Hz'])
iQT = hp.partitionSum(QT_mol_id[self.rules[imol].name.split('_')[0]], self.rules[imol].iso, iTg)
ipop_u = rs.boltzmann_distribution(np.array(mol['E_u']), np.array(mol['g_u']), iTg, iQT)
ipop_l = rs.boltzmann_distribution(np.array(mol['E_l']), np.array(mol['g_l']), iTg, iQT)
kappa_g_base = (np.array(mol['A'])*(c/inu0)**3/(8.0*np.pi**1.5*ivg)*ing*(np.array(mol['g_u'])/np.array(mol['g_l'])*ipop_l-ipop_u))
j_g_base = h*c*ing*ipop_u*np.array(mol['A'])/(4*np.pi**1.5*ivg)
sigma = ivg*inu0/c
for i in range(len(inu0)):
center = inu0[i]
s = sigma[i]
width = half_width_factor*s
idx_l = np.searchsorted(nu_axis, center-width)
idx_r = np.searchsorted(nu_axis, center+width)
if idx_l >= idx_r: continue
prof = rs.profile_function(nu_axis[idx_l:idx_r], center, s)
numerator[idx_l:idx_r] += j_g_base[i]*prof
denominator[idx_l:idx_r] += kappa_g_base[i]*prof
source_work = np.divide(numerator, denominator, out=np.zeros_like(numerator), where=denominator!=0)
tau_work = denominator*self.dz[igrid]
if is_reversed:
self.total_source_function[igrid, :] = source_work[::-1]
self.tau_grid[igrid, :] = tau_work[::-1]
else:
self.total_source_function[igrid, :] = source_work
self.tau_grid[igrid, :] = tau_work
[docs]
def calculate_dust_source(self, igrid):
"""Calculates source function and tau for dust only."""
kappa_d = self.ab_coeff_dust[igrid, :]
self.dust_source_function[igrid, :] = rs.planck(self.Td[igrid], self.nu_grid)
self.tau_dust_grid[igrid,:] = kappa_d*self.dz[igrid]
[docs]
def calculate_spectrum(self, I_background=None):
"""
Calculates the emergent intensity I_nu from the 1D slab.
I_background: Optional array of size (NLAMBDA) for radiation
entering from the bottom (e.g., stellar/disk midplane).
"""
if self.verbose: print('CALCULATE SPECTRUM')
if I_background is None:
I_nu = np.zeros(self.NLAMBDA)
else:
I_nu = I_background
for i in range(self.NGRID):
S_nu = self.total_source_function[i, :]
d_tau = self.tau_grid[i, :]
transmission = np.exp(-d_tau)
I_nu = I_nu*transmission+S_nu*(1.0-transmission)
return I_nu
[docs]
def calculate_dust_continuum(self,I_background=None):
if self.verbose: print('CALCULATE DUST CONTINUUM')
if I_background is None:
I_nu = np.zeros(self.NLAMBDA)
else:
I_nu = I_background
for i in range(self.NGRID):
S_nu = self.dust_source_function[i, :]
d_tau = self.tau_dust_grid[i, :]
d_tau = np.clip(d_tau, 0, 500)
transmission = np.exp(-d_tau)
I_nu = I_nu*transmission+S_nu*(1.0-transmission)
return I_nu
[docs]
def run_model(self):
if self.verbose: print('RUNNING MODEL-\n')
self.unit_conversions()
self.init_grid()
self.init_gas()
self.read_Qmatrix()
self.init_dust()
for i in tqdm(range(self.NGRID), desc="Calculating source function:"):
self.calculate_source(i)
self.calculate_dust_source(i)
# for i in range(self.NGRID):
# self.calculate_source(i)
# self.calculate_dust_source(i)
self.flux = self.calculate_spectrum()
self.continuum = self.calculate_dust_continuum()
[docs]
def convolve(self,R=1,lambda_0=1,lambda_n=1):
if self.verbose: print('CONVOLVE SPECTRUM')
if lambda_0==lambda_n:
lambda_0=np.amin(self.wavelength_grid)*1e6
lambda_n=np.amax(self.wavelength_grid)*1e6
lambda_0=lambda_0*10**-0.05
lambda_n=lambda_n*10**0.05
mask=(self.wavelength_grid<lambda_n*1e-6)&(self.wavelength_grid>lambda_0*1e-6)
FWHM=self.R_grid/R/2.355
g=Gaussian1DKernel(stddev=FWHM,factor=7)
self.convFlux=apy_convolve(self.flux[mask],g)
self.convContFlux=apy_convolve(self.continuum[mask],g)
self.convWavelength=self.wavelength_grid[mask]*1e6
self.convR=R
[docs]
def convolve_custom(self,values,R=1,lambda_0=1,lambda_n=1):
if self.verbose: print('CONVOLVE CUSTOM')
if lambda_0==lambda_n:
lambda_0=np.amin(self.wavelength_grid)*1e6
lambda_n=np.amax(self.wavelength_grid)*1e6
lambda_0=lambda_0*10**-0.05
lambda_n=lambda_n*10**0.05
mask=(self.wavelength_grid<lambda_n*1e-6)&(self.wavelength_grid>lambda_0*1e-6)
FWHM=self.R_grid/R/2.355
g=Gaussian1DKernel(stddev=FWHM,factor=7)
return(apy_convolve(values[mask],g))
[docs]
def from_prodimo(self,model_path,r,species=[],LineSelectionFile='LineSelection.in',MieFile='',lambda_0=4.9,lambda_n=28,R_grid=1e5):
if self.verbose: print('FROM PRODIMO')
model_2d = pread.read_prodimo(model_path)
ix = np.argmin(np.abs(model_2d.x[:,0]-r))
if isinstance(species,list):
isps = [model_2d.spnames[sp] for sp in species]
elif isinstance(species,str):
isps = [model_2d.spnames[species]]
else:
raise ValueError("species should be either list of strings, or a string")
if MieFile != '':
self.Qmatrix_file = MieFile
else:
self.Qmatrix_file = model_path+'/Mie.fits.gz'
self.path=model_path
self.lambda_0=lambda_0
self.lambda_n=lambda_n
self.R_grid=R_grid
self.amin=model_2d.params['amin']*1e4
self.amax=model_2d.params['amax']*1e4
self.apow=model_2d.params['apow']
self.NSIZE=50
self.dust_to_gas=model_2d.params['dust_to_gas_c']
self.dust_settle=True
self.dust_settle_alpha=model_2d.params['a_settle']
self.grain_density=3 #g/cm3
self.NGRID=model_2d.params['nzz']
self.M_star=model_2d.star.mass
self.r_au=r*1.0
self.z=model_2d.z[ix,:].copy()
self.Td=model_2d.td[ix,:]
self.nH=model_2d.nHtot[ix,:].copy()
self.rhog=model_2d.rhog[ix,:].copy()
self.ng=np.array([model_2d.nmol[ix,:,isp] for isp in isps])
self.nd=model_2d.nd[ix,:]
self.Tg=np.clip(model_2d.tg[ix,:].copy(), None, 5000)
self.vturb=np.ones_like(self.Tg)*model_2d.params['v_turb']*1e-3
self.LineSelection = LineSelectionFile