Source code for prodimopy.utils_casasim

"""
.. module:: utils_casasim
.. moduleauthor:: Ch. Rab

"""

import math
import os

import astropy.io.fits as fits
import casatasks as ct
from casatools import image as iatool
import numpy as np


[docs] def radprof(infile,incl,PA,fracbeam=0.33,rms=None,r_out=None): """ Make an azimuthally averaged deprojected radial profile of an 2D fits image. Used casatasks to make the radial profiles. The radial gridding is determined by the beam size. By default 1/3 of the beam size are used to define the width of one annulus. The information of the center of the image, beam sizes etc. are read from the fits header. So they have to be there. Ususullay that routine works fine with images that can be opened within casa. incl and PA have to be in degree. Parameters ---------- infile : str The path/filename of the fits file to use. incl : float The inclination in degrees PA : float The position angle in degrees fracbeam : float What fraction of the beam should be used for the radial spacing. Can also be >1. DEFAULT: 0.33 rms : float rms value of the image. Is used to determine the error. DEFAULT: `None` (no error estimate) r_out : float the maximum outer radius use for the radial profile in arcsec. Default: `None` (whole image is used) Returns ------- r,flux,fluxerr : array_like(ndim=1) Three arrays with the radius (in arcsec) the flux at each radius (units of the Image) and the error for the flux. """ if not os.path.isfile(infile): print("ERROR: Input fitsfile "+infile+" not found.") return None RA=ct.imhead(imagename=infile,mode='get',hdkey='CRVAL1') RA=str(RA["value"])+RA["unit"] DEC=ct.imhead(imagename=infile,mode='get',hdkey='CRVAL2') DEC=str(DEC["value"])+DEC["unit"] print("CENTER:",RA,DEC) BMAJ=ct.imhead(imagename=infile,mode='get',hdkey='BMAJ') BMIN=ct.imhead(imagename=infile,mode='get',hdkey='BMIN') print("BEAM: ",BMAJ["value"],BMAJ["unit"],BMIN["value"],BMIN["unit"]) bmaj=BMAJ["value"] bmin=BMIN["value"] bwidth=(bmaj+bmin)/2.0 # assume it is in degree CDELT1=ct.imhead(imagename=infile,mode='get',hdkey='CDELT1') print("CDELT1: ",CDELT1["value"],CDELT1["unit"]) # convert to arcsec pix_to_arcsec=np.abs(CDELT1["value"])/(math.pi/180.0)*3600.0 print("PIXSCALE: ",pix_to_arcsec," arcsec") # assume a certain fraction of the beam as the width width=fracbeam*bwidth print("WIDTH: ",str(width)+" arcsec") barea=math.pi*bmaj*bmin/(4.0*math.log(2.0)) bwidthpix=bwidth/pix_to_arcsec bareapix=barea/(pix_to_arcsec**2.0) print(barea,bwidth,bareapix,bwidthpix) cosinc=math.cos(incl*math.pi/180.0) if r_out is None: # assume it is a square image NPIX=ct.imhead(imagename=infile,mode='get',hdkey='SHAPE')[0] # take half of NPIC as the maximum extension, and convert it to arcsec r_out=NPIX/2*pix_to_arcsec print("r_out: ",r_out) rings=np.arange(width/2.0,r_out+width,width) # TODO check that but it seems for the ellipse we need a sign change # for the PV diagramm it seems to work. PAstring="-"+str(PA)+"deg" fluxes=list() rs=list() err=list() ioutput=0 for ring in rings: ioutput+=1 if ioutput%10==0: print("DO RING: ",ring,"/",r_out) maje=ring+width/2.0 mine=maje*cosinc maje=str(maje)+"arcsec" mine=str(mine)+"arcsec" region='ellipse[['+RA+', '+DEC+'], ['+mine+', '+maje+'],'+PAstring+']' stat=ct.imstat(imagename=infile,region=region) bigflux=stat['sum'][0] bigarea=stat['npts'][0] maje=ring-width/2.0 # treatment for the first point if maje<1.e-20: smallflux=0.0 smallarea=0.0 else: mine=maje*cosinc maje=str(maje)+"arcsec" mine=str(mine)+"arcsec" region='ellipse[['+RA+', '+DEC+'], ['+mine+', '+maje+'],'+PAstring+']' stat=ct.imstat(imagename=infile,region=region) smallflux=stat['sum'][0] smallarea=stat['npts'][0] # calculate the flux for the annulus ringarea=bigarea-smallarea flux=bigflux-smallflux fluxes.append(flux/ringarea) # average of major and minor axis ... same as the DSHARP people did rs.append(ring) # rs.append((ring+ring*cosinc)/2) if rms is not None: err.append(3.0*rms/(math.sqrt(ringarea/bareapix))) else: err.append(0.0) return np.array(rs),np.array(fluxes),np.array(err)
[docs] def write_radprof(outfile,r,flux,fluxerr): ''' Writes a radial profile to a file in the format used by casairing. This is only for compatibility for older projects. ''' header="# Distance (as), Frequency (GHz), Average (Jy/beam.km/s), Avg. Error (Jy/beam.km/s)" # just a dummy for the moment. Use the same format as the casairing task, that"s why this is here. output=np.column_stack((r,r*0.0,flux,fluxerr)) np.savetxt(outfile,output,header=header)
class _PostProcess(object): def get_beam_refimage(self): ''' Trys to read the beam information from the given reference image. ''' if self.refimage is None: return None try: bmaj=ct.imhead(imagename=self.refimage,mode='get',hdkey='BMAJ') # must be in arcsec bmin=ct.imhead(imagename=self.refimage,mode='get',hdkey='BMIN') # must be in arcsec # FIXME: assumes that it is in degrees and makes a conversion bPA=ct.imhead(imagename=self.refimage,mode='get',hdkey='BPA')['value'] # must be in deg # bPAFull=ct.imhead(imagename=refimagename,mode='get',hdkey='BPA') # print(bPAFull) # print(bPAFull['value'],bPAFull['unit']) # PA has to be in rad I guess beamloc=[str(bmaj["value"])+bmaj["unit"],str(bmin["value"])+bmin["unit"],str(bPA*0.01745329252)+"rad"] except AssertionError as error: print("ERROR: get_beam_refimage: The image "+self.refimage+" was not found by CASA (most likely cause of error).") raise return beamloc def _convole_spatial(self,infile,outfile,beam,noise=None): ia=iatool() ia.fromfits(infile=infile) # FIXME: quick hack for noise, currently one has to experiment with the value # to e.g. get the desired rms if noise is not None: # noise is given in Jy/beam print("INFO: Adding noise: ",noise) # print(ia.statistics()) ia.addnoise(type="uniform",pars=[-noise,noise],zero=False,seeds=[1,2]) # print(ia.statistics()) imconv=ia.convolve2d(major=beam[0],minor=beam[1],pa=beam[2],outfile=outfile) # beamarea=imconv.beamarea()["pixels"] # print(beamarea) imconv.done() ia.close() def _radial_profile(self,fitsfile,incl=None,PA=None,fracbeam=0.33,rms=None,r_out=None): print("radial_profile for "+fitsfile+" ...") if incl is None: incl=self.incl if PA is None: PA=self.PA if incl is None or PA is None: print("ERROR: Please provide inclination and position angle.") outfile=fitsfile.replace(".fits",".radial") write_radprof(outfile,*radprof(fitsfile,incl,PA,fracbeam=fracbeam,rms=rms,r_out=r_out)) print("Produced radial profile: "+outfile)
[docs] class ContinuumPP(_PostProcess): """ Object to post-process ProDiMo continuum images for CASA. """ def __init__(self,proj,modelfits,chans=None,band=None,incl=None,PA=None, refimage=None,singleimage=False,removeBackground=False): """ Atrributes ---------- """ self.proj=proj """ string : A project name which will be used to store the casa files in a directoy with name `proj`. Also all files will start with the prefix `proj`. """ self.modelfits=modelfits """ string : The path/filename for the continuum images output of ProDiMo. Usually this file is called image.fits. """ self.chans=chans """ string : A string with the desired channel number for the image. E.g. "0" selctes the first one. "50,51,51" will produce an average of this three channels. """ self.band=band """ array_like(ndim=1) : start and end wavelength `[start,end]` in [micron] of a "band". All images found withing this wavelength range will be combined to one. """ self.incl=incl self.PA=PA self.refimage=refimage self.projn=os.path.basename(proj) self.fprefix=self.proj+"/"+self.projn+"." self.fprefixtmp=self.proj+"/"+self.projn+"tmp." self.wlgrid=None self.singleimage=singleimage self.fmimage=self.fprefix+"mcont.fits" self.fmimageconv=self.fprefix+"cont.conv.fits" self.fmimageregrid=self.fprefix+"cont.conv.regrid.fits" self.fmimagefinal=self.fprefix+"cont.fits" os.system("rm -rfv "+proj+"/"+self.projn+"*") print("make Directory"+proj) os.system("mkdir "+proj) if (self.chans is None and self.band is None): raise ValueError("Either chans or band have to be set.") if not singleimage: self._read_wlgrid() self._init_image(removeBackground=removeBackground) def _read_wlgrid(self): """ Reads the wavelength grid from ProDiMo continuum fits image. TODO: Could be a general utility function. """ ct.importfits(fitsimage=self.modelfits,imagename=self.fprefixtmp+"wlgrid.im",overwrite=True,whichhdu=1) self.wlgrid=np.array(ct.imval(self.fprefixtmp+"wlgrid.im")["data"]) # print("Wavelength grid of image.fits",wlgrid) os.system("rm -rfv "+self.fprefixtmp+"wlgrid.im") def _init_image(self,removeBackground=False): """ Gets the desired image frome the image.fits produced by PRoDiMo. image.fits includes all the continuum images. Currently the image can be selected by providing the index (e.g. 0 first image) to select the image from image.fits. If a lilst of indices is provided the routine combines those images into one. This is usefull to produce an averaged image for e.g. one alma Band. TODO: make the routine more user friendly. For example one could provide the wavelength instead of the indices. """ print("init_image ...") if self.singleimage: print("cp -v "+self.modelfits+" "+self.fmimage) os.system("cp -v "+self.modelfits+" "+self.fmimage) else: if self.band is not None: # findd the indices (chanels) idxstart=np.argmax(self.wlgrid>=self.band[0]) idxend=np.argmin(self.wlgrid<=self.band[1]) wls=self.wlgrid[idxstart:idxend] idxs=list(map(str,np.arange(idxstart,idxend,1))) self.chans=",".join(idxs) nwl=len(idxs) print("Band: ",self.band,"combining channels (wavelengths): ",self.chans,wls) else: idxs=list(map(int,self.chans.split(","))) wls=[self.wlgrid[idx] for idx in idxs] nwl=len(idxs) print("combining channels (wavelengths): ",self.chans,wls) # FIXME: not nice # STRANGE: the oufiles must end with .image ... outerwise casa gives an error if nwl==1: ct.imsubimage(imagename=self.modelfits,outfile=self.fprefixtmp+"sub.image",chans=self.chans,dropdeg=True) else: ct.imsubimage(imagename=self.modelfits,outfile=self.fprefixtmp+"sub2.image",chans=self.chans,dropdeg=True) ct.imrebin(imagename=self.fprefixtmp+"sub2.image",outfile=self.fprefixtmp+"sub.image",factor=[1,1,nwl],dropdeg=True) ct.exportfits(fitsimage=self.fmimage,imagename=self.fprefixtmp+"sub.image",dropdeg=True,overwrite=True) os.system("rm -rfv "+self.fprefixtmp+"sub.image") os.system("rm -rfv "+self.fprefixtmp+"sub2.image") # FIXME: quick hack to remove the background, havent found another way yet, maybe just do it with astropy if (removeBackground): print("INFO: remove Background") ia=iatool() ia.fromfits('tmpimg',infile=self.fmimage,overwrite=False) # print(ia.summary()) # print(ia.name()) expr=r"tmpimg-min(tmpimg)" ia.calc(expr) ia.done() ct.exportfits(fitsimage=self.fmimage,imagename="tmpimg",dropdeg=True,overwrite=True) os.system("rm -rfv tmpimg")
[docs] def cleanup(self): # just to be sure, delete all casa stuff os.system("rm -rfv "+self.fprefix+"*.image") os.system("rm -rfv "+self.fprefix+"*.im") os.system("rm -fv "+self.fmimageconv) os.system("rm -fv "+self.fmimageregrid) os.system("rm -fv "+self.fmimage)
[docs] def conv_image(self,beam=None,noise=None): if beam is None: beam=self.get_beam_refimage() if beam is None: print("ERROR: Cannot convolve image, no beam found.") print("conv_image with beam: ",beam) outfile=self.fmimageconv.replace(".fits",".im") self._convole_spatial(self.fmimage,outfile,beam,noise=noise) ct.exportfits(fitsimage=self.fmimageconv,imagename=outfile,dropdeg=True,overwrite=True) os.system("rm -rfv "+outfile) # copy it to the final image, can still be overwritten os.system("cp "+self.fmimageconv+" "+self.fmimagefinal)
[docs] def regrid_image(self,fitsfile=None): if self.refimage is None: return None if fitsfile is None: fitsfile=self.fmimageconv print("regrid_image to "+self.refimage) RA,DEC=self.get_coord_refimage() outfile=fitsfile.replace(".fits",".im") ia=iatool() ia.fromfits(infile=fitsfile,outfile=outfile) ia.close() # set the center coordinates ct.imhead(imagename=outfile,mode='put',hdkey='CRVAL1',hdvalue=RA) ct.imhead(imagename=outfile,mode='put',hdkey='CRVAL2',hdvalue=DEC) # this is required in case one wants to do some regridding ct.imhead(imagename=outfile,mode='put',hdkey='TELESCOPE',hdvalue='ALMA') ia=iatool() ia.fromfits(infile=self.refimage) cs1=ia.coordsys() s1=ia.shape() # print(s1) ia.close() ia=iatool() ia.open(infile=outfile) newim=ia.regrid(shape=s1,csys=cs1.torecord(),overwrite=True,decimate=1) self.fmimageregrid=fitsfile.replace(".fits",".regrid.fits") newim.tofits(outfile=self.fmimageregrid,overwrite=True) newim.done() ia.close() os.system("rm -rfv "+outfile) os.system("cp "+self.fmimageregrid+" "+self.fmimagefinal)
[docs] def residual_image(self): ''' ''' if self.refimage is None: return None print("residual_image ...") outfile=self.fmimagefinal.replace(".fits",".im") val=ct.immath(imagename=[self.refimage,self.fmimageregrid],expr='IM0-IM1',outfile=outfile) ct.exportfits(outfile,fitsimage=self.fmimagefinal.replace(".fits",".diff.fits"),dropdeg=True,overwrite=True) os.system("rm -rfv "+outfile)
[docs] def radial_profile(self,incl=None,PA=None,fracbeam=0.33,rms=None,r_out=None,fitsfile=None): if fitsfile is None: fitsfile=self.fmimagefinal self._radial_profile(fitsfile,incl=incl,PA=PA,fracbeam=fracbeam,rms=rms,r_out=r_out)
[docs] def get_coord_refimage(self): ''' ''' if self.refimage is None: return None RA=ct.imhead(imagename=self.refimage,mode='get',hdkey='CRVAL1') RA=str(RA["value"])+RA["unit"] DEC=ct.imhead(imagename=self.refimage,mode='get',hdkey='CRVAL2') DEC=str(DEC["value"])+DEC["unit"] return RA,DEC
[docs] def doall(self): ''' ''' print("doall ...") self.conv_image() self.regrid_image() self.radial_profile() self.residual_image() self.cleanup()
[docs] class LinePP(_PostProcess): """ Object to post-process ProDiMo linecubes for comparison to observations. This is all still very experimental and everything can still change. """ def __init__(self,proj,modelfits,sys_vel=None,incl=None,PA=None,refimage=None): """ Atrributes ---------- """ self.proj=proj """ str : A project name which will be used to store the casa files in a directoy with name `proj`. Also all files will start with the prefix `proj`. """ self.modelfits=modelfits """ str : The path/filename for the line cube output of ProDiMo. Usually this file is called LINE_3D_xxx.fits. """ self.incl=incl self.PA=PA self.lineid=None """ str : The identificatoin string for the spectral line (From ProDiMo). """ self.linewl=None """ str : The wavelength of the line [micron] """ self.linefreq=None """ str : The frequency of the line [GHz] """ self.sys_vel=None self.refimage=refimage self.projn=os.path.basename(proj) self.fprefix=self.proj+"/"+self.projn+"." self.fprefixtmp=self.proj+"/"+self.projn+"tmp." self.fmimage=self.fprefix+"mcube.fits" self.fmimage_contsubm=self.fprefix+"mcube.contsub.fits" self.fmimage_contsubmcont=self.fprefix+"mcube.contsubcont.fits" self.fmimageconv=self.fprefix+"cube.conv.fits" self.fmimageregrid=self.fprefix+"cube.conv.regrid.fits" self.fmimagefinal=self.fprefix+"cube.fits" self.fmimagemom0=self.fprefix+"cube.integrated.fits" self.fmimagemom1=self.fprefix+"cube.mom1.fits" os.system("rm -rfv "+proj+"/"+self.projn+"*") print("make Directory"+proj) os.system("mkdir -p "+proj) self._init_image() def _init_image(self): ''' Copies the original image to avoid any troubles, and reads certain header attributes from the ProDiMo fits file (e.g. incl PA ) if they were not overwritten. ''' os.system("cp "+self.modelfits+" "+self.fmimage) fitscube=fits.open(self.fmimage) self.lineid=(fitscube[0].header["P_LINEID"]).strip() self.linewl=float(fitscube[0].header["P_LINEWL"]) self.linefreq=float(fitscube[0].header["P_LINEFR"]) if self.incl is None: self.incl=fitscube[0].header["P_INCL"] if self.PA is None: self.PA=fitscube[0].header["P_PAOBJ"] fitscube.close() def __str__(self): output="Line cube: "+self.lineid.split()[0]+" "+"{:7.4e} mic".format(self.linewl)+" / {:7.4e} GHz".format(self.linefreq)+"\n" output+="Filename: "+str(self.fmimage) output+=" / Inclination: "+str(self.incl) output+=" / Position angle: "+str(self.PA) output+="\n" return output
[docs] def cleanup(self): # just to be sure, delete all casa stuff os.system("rm -rfv "+self.fprefix+"*.image") os.system("rm -rfv "+self.fprefix+"*.im")
[docs] def conv_image(self,infits,outfits,beam=None,noise=None): if beam is None: beam=self.get_beam_refimage() if beam is None: print("ERROR: Cannot convolve image, no beam found.") print("conv_image with beam: ",beam," ...") outfile=outfits.replace(".fits",".im") self._convole_spatial(infits,outfile,beam,noise=noise) ct.exportfits(fitsimage=outfits,imagename=outfile,dropdeg=True,overwrite=True) os.system("rm -rfv "+outfile) print("Produced convolved cube: "+outfits)
# os.system("rm -rfv "+outfile) # self.lastimage=outfits # copy it to the final image, can still be overwritten # os.system("cp "+self.fmimageconv+" "+self.fmimagefinal) # def conv_casaimage(self,infile,outfits,beam=None): # ''' # Spatially convolves the given casa images and exports it to a fits file. # # ''' # if beam is None: # beam=self.get_beam_refimage() # if beam is None: # print("ERROR: Cannot convolve image, no beam found.") # # outfile=infile+".tmp" # self._convole_spatial(infile, outfile, beam) # ct.exportfits(fitsimage=outfits,imagename=outfile,dropdeg=True,overwrite=True) # os.system("rm -rfv "+outfile)
[docs] def momentmap(self,infits,outfits,moment=0,immomentsparams=None): print("momentmap moment "+str(moment)+" ...") outfile=outfits.replace(".fits",".im") if immomentsparams is not None: ct.immoments(infits,outfile=outfile,moments=[moment],**immomentsparams) else: ct.immoments(infits,outfile=outfile,moments=[moment]) ct.exportfits(fitsimage=outfits,imagename=outfile,dropdeg=True,overwrite=True) print("Produced moment "+str(moment)+" map: "+outfits) os.system("rm -rfv "+outfile)
[docs] def radial_profile(self,incl=None,PA=None,fracbeam=0.33,rms=None,r_out=None,infits=None): if infits is None: infits=self.fmimagemom0 self._radial_profile(infits,incl=incl,PA=PA,fracbeam=fracbeam,rms=rms,r_out=r_out)
[docs] def spectral_profile(self,infits=None): if infits is None: infits=self.fmimagefinal outfile=infits.replace(".fits",".specprof") ct.specflux(imagename=infits,logfile=outfile,overwrite=True) print("Produced spectral profile "+outfile)
[docs] def contsub_model(self,removeBackground=False,realcont=False): ''' Removes the continuum. This is done in the image plane. By default it uses directly the model continuum in the cube (this cannot be done in reality). Parameters ---------- removeBackground : bool Remove the background from the continuum result. Simply removes the min value if the continuum image from the image. realcont : bool Uses the real continuum from the cube. This real continuum is included the _CONT output files. This is useful to test the impact of continuum oversubtraction. With this option the absorption of the continuum by the line is considered and therefore the continuum subtraction is exact. But this is not something that can be done in reality. TODO: quick and dirty stuff ... TODO: include method to use certain channels from the cube for the cont subtraction ''' print("contsub_model ...") ct.importfits(fitsimage=self.fmimage,imagename=self.proj+"/cube.im",overwrite=True,whichhdu=0) if realcont: # Use the CONT cube for the exact continuum subtraction ct.importfits(fitsimage=self.modelfits.replace(".fits","_CONT.fits"),imagename=self.proj+"/cont.im",overwrite=True,whichhdu=0) else: ct.importfits(fitsimage=self.fmimage,imagename=self.proj+"/cont.im",overwrite=True,whichhdu=1) rval=ct.immath(imagename=[self.proj+"/cube.im",self.proj+"/cont.im"], expr='IM0-IM1',outfile=self.proj+"/cubemcont.im",imagemd=self.proj+"/cube.im") ct.exportfits(fitsimage=self.fmimage_contsubm,imagename=self.proj+"/cubemcont.im",dropdeg=True,overwrite=True) if removeBackground: rval=ct.immath(imagename=[self.proj+"/cont.im"], expr='IM0-min(IM0)',outfile=self.proj+"/contbg.im",imagemd=self.proj+"/cont.im") ct.exportfits(fitsimage=self.fmimage_contsubmcont,imagename=self.proj+"/contbg.im",dropdeg=True,overwrite=True) os.system("rm -rfv "+self.proj+"/contbg.im") else: ct.exportfits(fitsimage=self.fmimage_contsubmcont,imagename=self.proj+"/cont.im",dropdeg=True,overwrite=True) os.system("rm -rfv "+self.proj+"/cubemcont.im") os.system("rm -rfv "+self.proj+"/cont.im") os.system("rm -rfv "+self.proj+"/cube.im") print("Produced continuum subtraced cube: "+self.fmimage_contsubm)