Source code for prodimopy.interface2D.infile

import os
from copy import deepcopy

import numpy as np
from numpy.typing import NDArray
from prodimopy.read import Data_ProDiMo, DataDust

# FIXME: not nice, but use the au to cm conversion constant from ProDiMo
# just to be consistent
autocm = 1.495978700e13


[docs] class DustSizeDistribution(object): """ Helper data container for the dust size distribution. The assumption here is that all grains have the same grain mass density (i.e. only one population but with different sizes). """ def __init__(self, asize, fsize_rho): self.asize: NDArray[np.float64] = asize """ : the dust grain size (radius). UNIT: cm, DIM: NSIZE (number of grain sizes) """ self.fsize_rho: NDArray[np.float64] = fsize_rho """ : Grain size distribution function in absolute numbers. UNIT: |gcm^-3|, DIM: (NSIZE,NX,NZ) """
[docs] def fill_pmodel(self,pmodel: Data_ProDiMo): """ Fills the dust data from the given dust size distribution. In particular it initializes `pmodel.dust`. Parameters ---------- pmodel: The ProDiMo model object. """ pmodel.dust = DataDust(0.0, 0.0, 0.0, 0, 0) pmodel.dust.amin = np.min(self.asize) pmodel.dust.amax = np.max(self.asize) pmodel.dust.nsize = len(self.asize) pmodel.dust.asize = self.asize * 1.0e4 # in ProDiMo it is micron pmodel.dust.fsize_rho = self.fsize_rho pmodel.rhod = np.sum(pmodel.dust.fsize_rho, axis=(0)) pmodel.d2g = pmodel.rhod / pmodel.rhog return pmodel
[docs] class Interface2Din(object): """ Some utility class to generate input files for the |prodimo| 2D interface. .. todo: - it might make sense to actually use the :class:`~prodimopy.read.Data_ProDiMo` class to fill the values from the 2D input structure (e.g. from a hydro model). That would then also to directly plot the input. One could get rid of the :func:`get_pmodel` method. """ def __init__( self, x: NDArray[np.float64], z: NDArray[np.float64], rhoGas: NDArray[np.float64] | None = None, nHtot: NDArray[np.float64] | None = None, velocity: NDArray[np.float64] | None = None, g2d: NDArray[np.float64] | None = None, tdust: NDArray[np.float64] | None = None, tgas: NDArray[np.float64] | None = None, dustSDF: DustSizeDistribution | None = None, ): """ Parameters ---------- x the x (radial coordinates) in cm. DIM: (NX,NZ) z the z (vertical coordinates) in cm. DIM: (NX,NZ) rhoGas gas density in |gcm^-3|. Either `nHtot` or `rhoGas` have to be provided. DIM: (NX,NZ) nHtot hydrogen number density in (|cm^-3| velocity velocity field (vx,vy,vz) in cm/s g2d gas to dust mass ratio tdust dust temperature in K tgas gas temperature in K dustSDF The dust size distribution function Attributes ---------- """ if nHtot is None and rhoGas is None: raise Exception("Either rhoGas or nHtot have to be specified.") self.nx: int = x.shape[0] """ number of x grid points """ self.nz: int = x.shape[1] """ number of z grid points """ self.x: NDArray[np.float64] = x """ x coordinates in cm (Cartesian) """ self.z: NDArray[np.float64] = z """ z coordinates in cm (Cartesian) """ self.rhoGas: NDArray[np.float64] | None = rhoGas """ gas density in |gcm^-3| """ self.nHtot: NDArray[np.float64] | None = nHtot """ total hydrogen number density in |cm^-3| """ self.velocity: NDArray[np.float64] | None = velocity """ velocity vector vx,vy,vz in |cms^-1| """ self.g2d: NDArray[np.float64] | None = g2d """ Gas to dust mass ratio at every point. """ self.tdust: NDArray[np.float64] | None = tdust """ Dust temperature in K. """ self.tgas: NDArray[np.float64] | None = tgas """ Gas temperature in K. """ self.dustSDF: DustSizeDistribution | None = dustSDF """ Dust size distribution function """
[docs] def interpol_spherical( self, pmodel: Data_ProDiMo, imethod: str = "linear", fixmethod: int = 1 ): """ Interpolation onto the |prodimo| assuming the input grid was spherical. E.g. the routines tries to deal with the curvatures at the inner and outer edge. Parameters ---------- pmodel : the prodimopy model onto which the input structure should be interpolated. imethod : interpolation method passed to `scipy.interpolate.griddata`. fixmethod : Method to fix some problems in the interpolated grid. Doesn't necessarily work well, so if you don't need it don't use it. 0 or None: no fixing 1: fix vertical columns first (density should always increase towards midplane), Default. 2: similar to 1 but different approach (not sure any more) 3: fix only the azimuthal velocity vy (should never be negative in the disk region) """ from scipy.interpolate import griddata points = np.array([self.x.flatten() / autocm, self.z.flatten() / autocm]).T # can only use nearest, because griddata does not extrapolation ... and that causes problems # f = interpolate.interp2d(self.x.flatten()/AU, self.z.flatten()/AU, self.nHtot.flatten(), kind='linear',copy=False) if self.nHtot is not None: print("- Interpolate nHtot ...") pnHtot = griddata(points, self.nHtot.flatten(), (pmodel.x, pmodel.z), method=imethod) else: pnHtot = None if self.rhoGas is not None: print("- Interpolate rhoGas ...") prhoGas = griddata(points, self.rhoGas.flatten(), (pmodel.x, pmodel.z), method=imethod) else: prhoGas = None if self.velocity is not None: print("- Interpolate velocity ...") pvel = np.zeros(shape=(pmodel.nx, pmodel.nz, 3)) for vi in range(3): pvel[:, :, vi] = griddata( points, self.velocity[:, :, vi].flatten(), (pmodel.x, pmodel.z), method=imethod ) else: pvel = None # TODO: the following quantities are not included in the various fixing algorithms if self.g2d is not None: print("- Interpolate g2d ...") pg2d = griddata(points, self.g2d.flatten(), (pmodel.x, pmodel.z), method=imethod) else: pg2d = None if self.tdust is not None: print("- Interpolate tdust ...") ptdust = griddata(points, self.tdust.flatten(), (pmodel.x, pmodel.z), method=imethod) else: ptdust = None if self.tgas is not None: print("- Interpolate tgas ...") ptgas = griddata(points, self.tgas.flatten(), (pmodel.x, pmodel.z), method=imethod) else: ptgas = None if self.dustSDF is not None: print("- Interpolate dust size distribution ...") na = len(self.dustSDF.asize) pdustSDF = DustSizeDistribution( self.dustSDF.asize, np.zeros(shape=(na, pmodel.nx, pmodel.nz)), ) for i in range(na): pdustSDF.fsize_rho[i, :, :] = griddata( points, self.dustSDF.fsize_rho[i, :, :].flatten(), (pmodel.x, pmodel.z), method=imethod, ) # There is some problem at the z=NZZ (border of the disk), for now simply set the value ot nZZ-1 pdustSDF.fsize_rho[:, :, -1] = pdustSDF.fsize_rho[:, :, -2] else: pdustSDF = None # assumes grid is from spherical so height cannot be larger then the radius # make some cut-off grid from spherical coordinates so height cannot be larger then the radius cutoff = np.max(self.x) / autocm # if the prodimo grid is smaller then the input grid, then take the cut-off from the prodimo grid cutoff = np.min([pmodel.x[-1, 0], cutoff]) print("Cutoff radius: ", cutoff) # print(pmodel.z) pnx = len(pmodel.x[:, 0]) pnz = len(pmodel.z[0, :]) # density to check for NaN etc. pdens = pnHtot if pdens is None: pdens = prhoGas # Make sure the the midplane is filled # Another security check ... seems that in some grids the z=0 coordinate has NHtot= 0 or close to zero # check if it all the points are or close to zero in the midplane. if np.all(pdens[:, 0] < 1.0e-40) or np.all(np.isnan(pdens[:, 0])): print("Fixing midplane ...") pdens[:, 0] = pdens[:, 1] if pg2d is not None: pg2d[:, 0] = pg2d[:, 1] if pvel is not None: pvel[:, 0, :] = pvel[:, 1, :] if ptdust is not None: ptdust[:, 0] = ptdust[:, 1] if ptgas is not None: ptgas[:, 0] = ptgas[:, 1] if np.all(pdens[:, -1] < 1.0e-50) or np.all(np.isnan(pdens[:, -1])): print("Fixing z border ...") for arr in [pdens, pg2d, ptdust, ptgas]: if arr is not None: arr[:, -1] = arr[:, -2] if pvel is not None: pvel[:, -1, :] = pvel[:, -2, :] # fix outer vertical column if necessary if np.isnan(pdens[pnx - 1, 0]): pdens[pnx - 1, :] = pdens[pnx - 2, :] if pg2d is not None: pg2d[pnx - 1, :] = pg2d[pnx - 2, :] if pvel is not None: pvel[pnx - 1, :, :] = pvel[pnx - 2, :, :] if ptdust is not None: ptdust[pnx - 1, :] = ptdust[pnx - 2, :] if ptgas is not None: ptgas[pnx - 1, :] = ptgas[pnx - 2, :] # get rid of values outside the computational domain cutmask = pmodel.z > (cutoff * 1.1) pdens[cutmask] = 0.0 if pvel is not None: for vi in range(3): pvel[cutmask, vi] = 0.0 for arr in [pdens, pvel, ptdust, ptgas]: if arr is None: continue cnan = np.isnan(arr) if cnan.any(): print("Found NaNs set them to zero.") arr[cnan] = 0.0 # special treatment for g2d if it is nan set it to a large number if pg2d is not None: cnan = np.isnan(pg2d) if cnan.any(): print("Found NaNs in g2d set them to 1.e10.") pg2d[cnan] = 1.0e10 # special treatment for dustsizedistribution if pdustSDF is not None: cnan = np.isnan(pdustSDF.fsize_rho) if cnan.any(): print("Found NaNs in DustSizeDistribution set them to 0.0.") pdustSDF.fsize_rho[cnan] = 0.0 # TODO: check this again, doesn't seem to work very good if fixmethod == 1: # FIXME: I think all this is really not very general # fix some stuff in the inner region ... because of the spherical grid of MOCASSIN/PLUTO # this assumes that the density always increases towards the midplane (should be the case for disks) for ix in range(pnx): # print(ix) izmax = np.min( (pnz - 2, np.argmin(np.abs(pmodel.z[ix, :] / pmodel.x[ix, :] - 0.5))) ) # print("izmax: ",izmax) for iz in range(izmax, -1, -1): # from top to bottom # print(ix,iz,pmodel.x[ix,iz],pmodel.z[ix,iz],pdens[ix,iz]) if pdens[ix, iz] < pdens[ix, iz + 1]: print( "Fix (1): ", ix, iz, pmodel.x[ix, iz], pmodel.z[ix, iz], "{:15.5e}".format(pdens[ix, iz]), "{:15.5e}".format(pdens[ix, iz + 1]), ) pdens[ix, iz] = pdens[ix, iz + 1] if pg2d is not None: pg2d[ix, iz] = pg2d[ix, iz + 1] if pvel is not None: pvel[ix, iz, 0] = pvel[ix, iz + 1, 0] # pvel[ix,iz,1]=pvel[ix,iz+1,1] pvel[ix, iz, 2] = pvel[ix, iz + 1, 2] # check explicitly for vy if (pvel is not None) and pvel[ix, iz, 1] < pvel[ix, iz + 1, 1]: pvel[ix, iz, 1] = pvel[ix, iz + 1, 1] elif fixmethod == 2: for ix in range(pnx - 1): for iz in range(pnz - 2, -1, -1): # from top to bottom # print(ix,iz,pmodel.x[ix,iz],pmodel.z[ix,iz]) if pdens[ix, iz] < pdens[ix, iz + 1]: print( "Fix (2): ", ix, iz, pmodel.x[ix, iz], pmodel.z[ix, iz], "{:15.5e}".format(pdens[ix, iz]), "{:15.5e}".format(pdens[ix + 1, iz]), ) pdens[ix, iz] = pdens[ix + 1, iz] if pg2d is not None: pg2d[ix, iz] = pg2d[ix + 1, iz] if pvel is not None: pvel[ix, iz, 0] = pvel[ix + 1, iz, 0] pvel[ix, iz, 2] = pvel[ix + 1, iz, 2] # check explicitly for vy if (pvel is not None) and pvel[ix, iz, 1] < pvel[ix, iz + 1, 1]: pvel[ix, iz, 1] = pvel[ix + 1, iz, 1] elif fixmethod == 3: # fixes only the vy ... in the disk should never be zero for iz in range(pnz - 2): # from bottom to top for ix in range(pnx - 2, -1, -1): # outside to inside if ( pvel is not None and pvel[ix, iz, 1] < pvel[ix + 1, iz, 1] and pvel[ix, iz, 1] <= 0.0 and (pmodel.z[ix, iz] / pmodel.x[ix, iz]) < 1.0 ): # print("Fix (3): ",ix,iz,pmodel.x[ix,iz],pmodel.z[ix,iz],"{:15.5e}".format(pdens[ix,iz]),"{:15.5e}".format(pdens[ix+1,iz])) print( "Fix (3): ", ix, iz, pmodel.x[ix, iz], pmodel.z[ix, iz], pvel[ix, iz, 1], pvel[ix + 1, iz, 1], ) # print("Fix (3): ",ix,iz,pmodel.x[ix,iz],pmodel.z[ix,iz],"{:15.5e}".format(pvel[ix,iz,1]),"{:15.5e}".format(pvel[ix+1,iz,1])) pvel[ix, iz, 1] = pvel[ix + 1, iz, 1] # does not really work, because the density can drop towards the inner radius # f = interpolate.interp1d(np.log10(pmodel.x[ix+1:,iz]), np.log10(pdens[ix+1:,iz]), fill_value = "extrapolate") # pdens[ix,iz]=10**f(np.log10(pmodel.x[ix,iz])) # if np.isnan(pdens[ix,iz]): pdens[ix,iz]=pdens[ix+1,iz] # pdens[ix,iz]=pdens[ix+1,iz] # if pg2d is not None: pg2d[ix,iz]=pg2d[ix+1,iz] return prhoGas, pnHtot, pvel, pg2d, ptdust, ptgas, pdustSDF
[docs] def toProDiMo( self, pmodel: Data_ProDiMo, outdir: str = ".", imethod: str = "linear", fixmethod: int = 1 ) -> Data_ProDiMo: """ Interpolates the given density/velocity structure onto the |prodimo| grid and writes the input files for ProDimo. The new quantities are written into a copy of `pmodel` and returned. Parameters ---------- pmodel : the prodimopy model onto which the input structure should be interpolated. outdir : output directory where the |prodimo| input files are written to. imethod : interpolation method passed to `scipy.interpolate.griddata`. For details see :func:`interpol_spherical`. fixmethod : method used to fix the input structure after interpolation. For details see :func:`interpol_spherical`. .. todo: - make it general (e.g. file names) - maybe make the interpol routines return alreayd a ProDiMo_Data object """ print("Do the interpolation ...") # prhoGas, pnHtot, pvel, pg2d, ptdust, ptgas, pDDF = self.interpol_spherical( pmodel, imethod=imethod, fixmethod=fixmethod ) # FIXME: Make this consistent with get_pmodel or actually use get_pmodel newpmodel = deepcopy(pmodel) if prhoGas is not None: newpmodel.rhog = prhoGas if pnHtot is not None: newpmodel.nHtot = pnHtot if pg2d is not None: newpmodel.d2g = 1.0 / pg2d if prhoGas is not None: newpmodel.rhod = prhoGas / pg2d if pvel is not None: newpmodel.velocity = pvel / 1.0e5 # in prodimopy it is km/s if ptdust is not None: newpmodel.td = ptdust if ptgas is not None: newpmodel.tg = ptgas if pDDF is not None: newpmodel=pDDF.fill_pmodel(newpmodel) print("Create the files ...") self.write_text(prhoGas, pnHtot, pvel, pg2d, ptdust, ptgas, pDDF, outdir) return newpmodel
[docs] def write_text( self, prhoGas: NDArray[np.float64] | None, pnHtot: NDArray[np.float64] | None, pvel: NDArray[np.float64] | None, pg2d: NDArray[np.float64] | None, ptdust: NDArray[np.float64] | None, ptgas: NDArray[np.float64] | None, pDustSizeDistribution: DustSizeDistribution | None, outdir: str, ): """ Write the input files for |prodimo| in text format. All generated files have the prefix `pluto_` and the postfix `.dat`. Parameters ---------- outdir The output directory (e.g. your initial ProDiMo model). """ if prhoGas is not None: ofile = os.path.join(outdir, "pluto_rho.dat") print(ofile) np.savetxt(outdir + "/pluto_rho.dat", prhoGas) if pnHtot is not None: np.savetxt(outdir + "/pluto_dens.dat", pnHtot) if pvel is not None: ofile = os.path.join(outdir, "pluto_vx.dat") print(ofile) np.savetxt(ofile, pvel[:, :, 0]) ofile = os.path.join(outdir, "pluto_vy.dat") print(ofile) np.savetxt(ofile, pvel[:, :, 1]) ofile = os.path.join(outdir, "pluto_vz.dat") print(ofile) np.savetxt(ofile, pvel[:, :, 2]) if pg2d is not None: ofile = os.path.join(outdir, "pluto_gd.dat") print(ofile) np.savetxt(ofile, pg2d) if ptdust is not None: ofile = os.path.join(outdir, "pluto_tdust.dat") print(ofile) np.savetxt(ofile, ptdust) if ptgas is not None: ofile = os.path.join(outdir, "pluto_tgas.dat") print(ofile) np.savetxt(ofile, ptgas) if pDustSizeDistribution is not None: ofile = os.path.join(outdir, "pluto_dustsizedist.dat") print(ofile) with open(ofile, "w") as outfile: na = len(pDustSizeDistribution.asize) outfile.write(str(na) + "\n") outfile.write(" ".join(["%.10E"] * na) % tuple(pDustSizeDistribution.asize) + "\n") for ia in range(na): np.savetxt(outfile, pDustSizeDistribution.fsize_rho[ia, :, :], fmt="%.10E") return
[docs] def write_fits(self, filename="in2D.fits", comments=None): """ NOT YET IMPLEMENTED! Write the 2D data of this object as input for |prodimo| as a fits file. Quantities that are `None` will be skipped. Parameters ---------- rhoGas : array_like the gas density in g/cm3; DIMENSION: (nx,nz) nHtot : array_like the particle number density in g/cm3; DIMENSION: (nx,nz) g2d : array_like the gas to dust mass ratio; DIMENSION: (nx,nz) velocity : array_like disk velocity vector at each position in the disk vx,vy,vz in cm/s; DIMENSION: (nx,nz,3) filename : str the output filename/path optional. comments : don't know yet comments added to the fits file """ print("Not yet implemented!") return
[docs] def get_pmodel(self) -> Data_ProDiMo: """ Returns the data as a :class:`~prodimopy.read.Data_ProDiMo` object. Can be useful for plotting (e.g. use the prodimopy plotting routines) .. todo :: - something very similar is done in toProDiMo. Maybe that can be merged somehow. Returns ------- Data_ProDiMo the prodimopy representation of this 2D input structure. """ pmodel = Data_ProDiMo(name="input model") pmodel.x = self.x / autocm pmodel.z = self.z / autocm pmodel.nx = pmodel.x.shape[0] pmodel.nz = pmodel.x.shape[1] if self.rhoGas is None: pmodel.nHtot = self.nHtot else: pmodel.rhog = self.rhoGas # Fixme: that does not work if rhoGas is not set if self.g2d is not None: pmodel.d2g = 1.0 / self.g2d # that does not work if we just have nHtot ... so just don't do it # pmodel.rhod=self.rhoGas*pmodel.d2g pmodel.td = self.tdust pmodel.tg = self.tgas if self.velocity is not None: pmodel.velocity = self.velocity / 1.0e5 else : pmodel.velocity = None if self.dustSDF is not None: pmodel=self.dustSDF.fill_pmodel(pmodel) return pmodel
[docs] def init_from_text(model: Data_ProDiMo) -> Interface2Din: """ Reads the `pluto_*` files from the current model directory, fills a new :class:`~prodimopy.interface2D.infile.Interface2Din` object and returns it. Parameters ---------- model : A ProDiMo model object that is associated with the corresponding pluto files. (E.g. has the same grid etc.) """ # just use a dummy for rhogas first. out=Interface2Din(model.x,model.z,rhoGas=[]) fname=os.path.join(model.directory, "pluto_rho.dat") if os.path.isfile(fname): out.rhoGas=np.loadtxt(fname) fname = os.path.join(model.directory, "pluto_gd.dat") if os.path.isfile(fname): out.g2d= np.loadtxt(fname) fname = os.path.join(model.directory, "pluto_tdust.dat") if os.path.isfile(fname): out.tdust = np.loadtxt(fname) fname = os.path.join(model.directory, "pluto_tgas.dat") if os.path.isfile(fname): out.tgas = np.loadtxt(fname) for iv,vname in enumerate(["vx","vy","vz"]): fname = os.path.join(model.directory, f"pluto_{vname}.dat") if os.path.isfile(fname): if out.velocity is None: out.velocity = np.zeros(shape=(out.nx, out.nz, 3), dtype=np.float64) out.velocity[:, :, iv] = np.loadtxt(fname) fname = os.path.join(model.directory, "pluto_dustsizedist.dat") if os.path.isfile(fname): # read dust size distribution with open(fname, "r") as infile: na = int(infile.readline().strip()) asize = np.array( [float(x) for x in infile.readline().strip().split()], dtype=np.float64 ) fsize_rho = np.zeros(shape=(na, out.nx, out.nz), dtype=np.float64) for ia in range(na): for ix in range(out.nx): line = infile.readline().strip() fsize_rho[ia, ix, :] = np.array( [float(x) for x in line.split()], dtype=np.float64 ) out.dustSDF = DustSizeDistribution(asize, fsize_rho) return out