Source code for prodimopy.chemistry.network

'''
Created on 15 Feb 2021

@author: Christian Rab
'''
from abc import ABC,abstractmethod
import decimal
import math
from timeit import default_timer as timer

import numpy as np


[docs]class ReactionNetwork(ABC): ''' General representation of a chemical reaction network. .. todo:: make it an abstract class that requires the child classes to implement the load_reactions routine. Currently this only works for the Reactions.out from ProDiMo. ''' def __init__(self,name="ReactionNetwork"): ''' Create a new Reaction Network. ''' self.name=name ''' string : A name for the reaction network ''' self.reactions=list() ''' list(:class:`Reaction`) : The list of reactions included in the network. ''' self._species=list() ''' list(str) : The list of species derived from the list of Reactions. Is create via a property function. ''' self.filename=None ''' string : The name/path of the file containing the Reaction network. ''' self.non_species=['PE','PHOT','PHOTON','XPHOT','M','ELECTR','PHOTON', 'GRAIN','FORM','CRPHOT','CRP','SPONT','DESCR','DESPH','DESTH', 'CRP','CP','e-','M',"dust","RADASS","PUMP"] ''' A list of Species names that are considerd as non species (not really a chemical species) but might be included in the network. FIXME: provide a way to change that list ''' @property def species(self): ''' The list of individual species in the network. ''' # only load if not yet loaded. if len(self._species)==0: # Create a species list for reac in self.reactions: for spec in reac.reactants+reac.products: # print(spec) if not spec in self._species and spec not in self.non_species: self._species.append(spec) self._species.sort() return self._species def __str__(self): return "Name: "+self.name+"\n"+" File : "+str(self.filename)+"\n"+" Number of Reactions: "+str(len(self.reactions))+"\n Number of Species: "+str(len(self.species))
[docs] def duplicates(self): ''' Returns possible duplicate reactions in the network. TODO: is quite slow TODO: properly define what is a duplicate Returns ------- list(:class:`Reaction`) : Returns a list of the duplicate Reactions or an empty list if there are no duplicates ''' dups=list() # starttime=timer() for ridx,reac in enumerate(self.reactions): for reac2 in self.reactions[(ridx+1):]: if (self._duplicatereaction(reac.compare(reac2))): dups.append((reac,reac2)) break # print("Time: ","{:4.2f}".format(timer()-starttime)+" s") return dups
[docs] def renameSpecies(self,old,new,variants=["","+","#"],log=True): ''' Renames species with name `old` with species name `new` in the whole network. Parameters ---------- old : str The species name of the species that should be renamed. new : str The new name of the species. variants : array_like(str) extension to the species names that should also be considered for renaming. for example if `variants=["","+","#"] and the old name is SP and the new name is SPNEW the routine would replace also species `SP,SP+,SP#` with `SPNEW,SPNEW+,SPNEW#` , respectively. Types has to at least contain one element. Default: `["","+","#"]` log : boolean log on changes on the screen. Default: True Returns ------- int : The total number of changed Reactions. ''' # number of changed done nchanged=0 for reac in self.reactions: for variant in variants: spec=old+variant nspecex=new+variant # indices of the prodcuts the match the species ip=[i for i,e in enumerate(reac.products) if e==spec] ir=[i for i,e in enumerate(reac.reactants) if e==spec] if len(ip)>0 or len(ir)>0: if log: print("Replace "+spec+" with "+nspecex+" in reaction ") print(reac) nchanged+=1 for i in ip: reac.products[i]=nspecex for i in ir: reac.reactants[i]=nspecex # reset internal species so that is is reloaded again in method `species self._species=list() return nchanged
[docs] def compareSpecies(self,reactionNetwork): ''' Compares the species list of two networks. Parameters ---------- reactionNetwork : :class:`ReactionNetwork` Another reaction network Returns ------- (tuple) : * `boolean` ... equal or not * list ... list of species only in old network * list ... list of species only in new network ''' specOld=self.species specNew=reactionNetwork.species # propably not very fast but fast enough I guess onlyOld=list() for spec in specOld: if spec not in specNew: onlyOld.append(spec) onlyNew=list() for spec in specNew: if spec not in specOld: onlyNew.append(spec) # if both empty the species are the same, return (not onlyOld and not onlyNew,onlyOld,onlyNew)
[docs] def compare(self,reactionNetwork,printresults=False,eqfunc=None,chgfunc=None,log=True): ''' Compares the network to the given reaction network. Currently simply prints out the results to stdout. Parameters ---------- reactionNetwork : :class:`ReactionNetwork` Another reaction network printresults : boolean Print the results to stdout eqfunc : Pass a function that decides if two Reactions are equal. The function must take as an argument the outcome of the Reaction compare function. chgfunc : Pass a function that decides if two Reactions are the same Reactoin but some other quantities changes (coefficients, type etc). The function must take as an argument the outcome of the Reaction compare function. log : boolean Include log output (print statements) or not. Returns ------- (tuple) : * `boolean` ... equal or not * list ... list of changed reactions * list ... list of reactions only in this network * list ... list of reactions only in the passed network ''' starttime=timer() if log: print("INFO: compare ",self.name," to ",reactionNetwork.name," ...") # Copy the reaction list as we will change it reactionsNew=reactionNetwork.reactions.copy() eqf=self._equalreaction if eqfunc is not None: eqf=eqfunc chgf=self._changedreaction if chgfunc is not None: chgf=chgfunc # Compare the two databases changed=list() onlyold=list() for reac in self.reactions: found=False for reacNew in reactionsNew: equal=reac.compare(reacNew) if eqf(equal): # found the equal reaction ... done found=True # now we can remove it is already found reactionsNew.remove(reacNew) break elif chgf(equal): # found a changed reactions ... done changed.append((reac,reacNew,equal)) found=True # now we can remove it is already found reactionsNew.remove(reacNew) break # collect the ones only in the old if not found: onlyold.append(reac) # the rest must be the onlynew ones onlynew=reactionsNew equal=len(changed)==0 and len(onlyold)==0 and len(onlynew)==0 if log: print("INFO: time: ","{:4.2f}".format(timer()-starttime)+" s") if printresults: print(" ") print("FOUND "+str(len(changed))+" CHANGED REACTIONS: ") for cr in changed: print("OLD: ",cr[0]) print("NEW: ",cr[1]) print("EQUAL: ",cr[2]) print(" ") print(" ") print(" ") print("FOUND "+str(len(onlyold))+" REACTIONS THAT ONLY EXIST IN NETWORK "+self.name+" (OLD): ") for r in onlyold: print(r) print(" ") print(" ") print("FOUND "+str(len(onlynew))+" REACTIONS THAT ONLY EXIST IN NETWORK "+reactionNetwork.name+" (NEW): ") for r in onlynew: print(r) print(" ") return equal,changed,onlyold,onlynew
def _duplicatereaction(self,eq): ''' Defines what is a duplicate reaction depending on the eq dictionary. Parameters ---------- eq : dictionary The equal dictionsary see ''' return eq["reactants"] and eq["products"] def _equalreaction(self,eq): ''' Are the reactions equal? ''' return eq["reactants"] and eq["products"] and eq["coeffs"] and eq["rate"]==True def _changedreaction(self,eq): ''' Has the reaction changed? ''' return (eq["reactants"] and eq["products"]) and (eq["coeffs"]==False or eq["type"]==False or eq["gasphase"]==False or eq["rate"]==False)
[docs] @abstractmethod def load_reactions(self,filename=None): ''' In this routine the reading of the Reaction network needs to be implemented. Fills in the `reactions` field in this class. Parameters ---------- filename : str The name/path of the file to read ''' pass
[docs] @abstractmethod def load_rates(self,filename=None): ''' In this routine the reading of rate coeeficients need to be implemented. Parameters ---------- filename : str The name/path of the file to read ''' pass
[docs]class Reaction(object): ''' General representation of one chemical reaction. ''' def __init__(self): self.id=0 """ int : The Reaction Id as is """ self.idU=0 """ int : The Umist reaction id ... it is unclear what this actually is. """ self.type="" """ string : The Reation type identifier as used in ProDiMo """ self.gasphase=None """ boolean : Gas phase reaction or not. """ self.reactants=list() """ array_like : The list of reaction reactants (Species names) """ self.products=list() """ array_like : The list of reaction products (Species names) """ self.coeffs=np.zeros(shape=(3)) """ array_like(float,shape=) : The `[alpha,beta,gamma]` coefficients per temperature range. For multipe temperature ranges it will become a 2D array with shape (3,number of t ranges) e.g. `[[alpha,beta and gamma],[alpha2,beta2,gamma2]]` for two temperature ranges. FIXME: might be hard to handle as it can either be 1D or 2D array. Could be fixed with a @property decorator """ self.temps=np.zeros(shape=(2)) """ array_like The temperature range ([min,max]). Can have more entries if there are several temperature ranges. The shape will than be (2,number of t ranges)). FIXME: might be hard to handle as it can either be 1D or 2D array. """ self.rate=None """ float : The real rate for a given set of physical conditions. """ self.clem=None """ str : Something from UMIST do not know what it is. """ self.accuracy=None """ str : A string describing the quality of the reactions coefficients """ self.comment="" """ str : Any relevant comment for this particular reaction. """ # this is just for internal use to have coeff also as a decimal, to take into account the strange # format of Reactions.in ... will only be filled in that case self._coeff2=None
[docs] def compare(self,reaction): ''' Compares the Reaction to the passed reaction. The routines evaluates for a selection of the properties from `class:`Reaction` individually if it is equal or not. See below for the return value. Parameters ---------- reaction : :class:`Reaction` Returns ------- dictionary : A dictionary with boolean values with the following keys. The keys have the same names as the properties of the :class:`Reaction`. * `type` * `gasphase` * `reactants` * `products` * `coeffs` * `temps` ..todo : * deal with comparison of the network if rates only exist in one network * the value of the rate coeffs are not compared/checked * make the tolerances for comparison of the rate a parameter somehow ''' equal=dict() # if self.id != reaction.id: # equal["id"]=False # if self.idU != reaction.idU: # equal["idU"]=False equal["type"]=self.type==reaction.type equal["gasphase"]=self.gasphase==reaction.gasphase equal["reactants"]=sorted(self.reactants)==sorted(reaction.reactants) equal["products"]=sorted(self.products)==sorted(reaction.products) equal["coeffs"]=True # could be different because of multiple temperature ranges flat=self.coeffs.flatten() flatNew=reaction.coeffs.flatten() if len(flat)!=len(flatNew): equal["coeffs"]=False else: if not all(flat==flatNew): equal["coeffs"]=False equal["temps"]=True # could be different because of multiple temperature ranges # check for Nones if self.temps is None or reaction.temps is None: equal["temps"]=False else: flat=self.temps.flatten() flatNew=reaction.temps.flatten() if len(flat)!=len(flatNew): equal["temps"]=False else: if not all(flat==flatNew): equal["temps"]=False # TODO: if not loaded in both reactions, the comparison does not make sense # TOD # Maybe write an error. if self.rate!=None and reaction.rate!=None: # print(self.rate,reaction.rate,math.isclose(self.rate,reaction.rate,rel_tol=1e-20,abs_tol=0.0)) equal["rate"]=math.isclose(self.rate,reaction.rate,rel_tol=1e-4,abs_tol=0.0) else: equal["rate"]=True return equal
def __str__(self): out=str(self.id)+" "+str(self.idU)+" "+str(self.type)+" "+str(self.reactants)+" "+str(self.products)+" "+str(self.coeffs)+" "+str(self.temps) if self.rate!=None: out=out+"{:13.5e}".format(self.rate) out=out+" "+self.comment return out
[docs]class ReactionNetworkPout(ReactionNetwork): ''' Implementation of ReactionNetwork for the Reaction.out of a ProDiMo model. ''' def __init__(self,name="ReactionNetworkPout",modeldir=None): ''' Parameters ---------- modeldir : str If set the init routine trys to load the Reactions.out and the rates log file (at the moment). But continues if it does not work ''' super().__init__(name=name) if self.name==None and modeldir is not None: self.name=modeldir if modeldir is not None: try: self.load_reactions(filename=modeldir+"/Reactions.out") except: print("WARN: Could not load reactions from Reactions.out") try: self.load_rates(filename=modeldir+"/rate_coeffs_1NZZ.log") except: print("WARN: Could not load rate coefficients from rate_coeffs_1NZZ.log") print(" ")
[docs] def load_reactions(self,filename="Reactions.out"): ''' Reads a reaction network in the format of the ProDiMo Reactions.out. Fills in the `reactions` field in this class. Parameters ---------- filename : str The name/path of the file to read ''' fr=open(filename) self.filename=filename lines=fr.readlines() # stupid workaround for old format oldformat=False try: for line in lines: if not oldformat: fields=line.split() try: tidx=int(fields[-6]) except ValueError: print("WARN: Try to read an older format ... let's hope it works!") oldformat=True if oldformat: # now insert some spaces at the right places for the coefficients line=line[:23]+" "+line[23:] line=line[:53]+" "+line[53:] line=line[:-43+8]+" "+line[-43+8:] line=line[:-43+17]+" "+line[-43+17:] fields=line.split() tidx=int(fields[-6]) # new Reaction if tidx==1: reac=Reaction() # new reactions, append it and fill it self.reactions.append(reac) reac.id=int(fields[0]) reac.idU=int(fields[1]) reac.type=fields[2] if oldformat: reac.gasphase=True reacprod=fields[3:-6] else: reac.gasphase=fields[3].strip()=="T:" reacprod=fields[4:-6] reac.coeffs[:]=[float(fields[-5]),float(fields[-4]),float(fields[-3])] reac.temps[:]=[float(fields[-2]),float(fields[-1])] # print(reacprod) nextisprod=False for entry in reacprod: if entry=="-->": nextisprod=True elif entry=="+": continue else: if nextisprod: reac.products.append(entry) else: reac.reactants.append(entry) else: reac=self.reactions[-1] reac.coeffs=np.vstack((reac.coeffs,[float(fields[-5]),float(fields[-4]),float(fields[-3])])) reac.temps=np.vstack((reac.temps,[float(fields[-2]),float(fields[-1])])) except Exception as e: print(line) print(e) fr.close() raise e # print(reac) fr.close() print("Loaded ",len(self.reactions)," Reactions from ",self.filename)
def _equalreaction(self,eq): ''' Are the reactions equal? ''' # here we also have the type return super()._equalreaction(eq) and eq["type"]==True
[docs] def load_rates(self,filename="rate_coeffs_1NZZ.log"): ''' Load the real rates for a given set of physical conditions. Can be use for comparison. Has to be run after load_reactions. .. todo:: Is not general; just a test at the moment. ''' rates=np.loadtxt(filename,usecols=[1]) for i,reaction in enumerate(self.reactions): reaction.rate=rates[i]
[docs]class ReactionNetworkPin(ReactionNetwork): ''' Implementation of ReactionNetwork for the Reactions.in of ProDiMo. ''' def __init__(self,name="ReactionNetworkPin",modeldir=None): ''' Parameters ---------- name : str a name for the network modeldir : str If set, the init routine trys to load the `Reactions.in` from modeldir. But continues if it does not work. ''' super().__init__(name=name) self.modeldir=modeldir if self.name==None and modeldir is not None: self.name=modeldir if modeldir is not None: try: self.load_reactions(filename=modeldir+"/Reactions.in") except Exception as err: print("WARN: Could not load reactions from Reactions.in",err) print(" ") def _equalreaction(self,eq): ''' Are the reactions equal? ''' return eq["reactants"] and eq["products"] and eq["coeffs"] def _changedreaction(self,eq): ''' Has the reaction changed? ''' return (eq["reactants"] and eq["products"]) and (eq["coeffs"]==False)
[docs] def load_reactions(self,filename="Reactions.in",fmt=None,threeReactants=True): ''' Reads a reaction network in the format of the ProDiMo Reactions.in Fills in the `reactions` field in this class. .. todo:: Include reading of T-dependent rates for the csv format. More sophisticated guessing of the file format. Parameters ---------- filename : str The name/path of the file to read. fmt : str Format of the file. Currently either `in` for old Reactions.in or `csv` for the UMIST csv format. If `None` the routine trys to guess the format (primitive at the moment). ''' # guess the format old .in or csv format if fmt is None: csv=filename.strip().endswith(".csv") elif fmt=="csv": csv=True else: csv=False fr=open(filename) self.filename=filename lines=fr.readlines() if csv: for line in lines: if line.strip()=="": continue if line.strip().startswith("#"): continue fields=line.split(",") reac=Reaction() reac.id=int(fields[0]) reac.idU=None reac.type=fields[1] reac.clem=fields[12].strip() reac.temps[:]=[float(fields[13].strip()),float(fields[14].strip())] reac.accuracy=fields[15].strip() reac.comment=fields[16].strip() # three reactants and four products for i,sp in enumerate(fields[2:9]): if sp=="": continue if i<3: reac.reactants.append(sp) else: reac.products.append(sp) reac.coeffs[:]=[float(x.replace("D","E")) for x in fields[9:12]] self.reactions.append(reac) else: for line in lines: if line.strip()=="": continue # do it similar to ProDiMo # print(line) idnum=line[0:5] specs=line[6:61] ABC=line[61:89].split() comm=line[89:] reac=Reaction() reac.id=int(idnum.strip()) reac.idU=None reac.type=None reac.temps=None # lensp=8 # for i in range(7): # sp=specs[i*lensp:8+i*lensp].strip() # if i<3 and sp!="": # reac.reactants.append(sp) # elif i>=3 and sp!="": # reac.products.append(sp) # very tricky, each species should take 8 characters, but the reading routine of PRoDiMo works # also if one is only 7 charachters (don't know how), but it seems in the end the species can have # max 7 characters. lensp=8 idxnextsp=0 for i in range(7): sp=specs[idxnextsp:(idxnextsp+lensp)].strip() # if (sp.strip()=="1"): return # print(i,sp,idxnextsp,specs[idxnextsp:(idxnextsp+lensp)],"+",specs[idxnextsp+lensp],"+") if i<3 and sp!="": reac.reactants.append(sp) elif i>=3 and sp!="": reac.products.append(sp) if i==6: # last species idxnextsp+=(lensp-1) else: idxnextsp+=lensp # if (idxnextsp+lensp)<61 and specs[idxnextsp+lensp]!=" ": idxnextsp-=1 reac.coeffs[:]=[float(x.replace("D","E")) for x in ABC] reac._coeff2=decimal.Decimal(ABC[1].strip()) # This is just for output and only for Reactions.in reac.comment=comm.strip() self.reactions.append(reac) fr.close() print("Loaded ",len(self.reactions)," Reactions from ",self.filename)
[docs] def write_reactions(self,filename="Reactions.in.new",fmt=None): ''' Writes the network to a file. The default format is the one from ProDiMo Reactions.in. .. todo:: does not work yet for reactions with multiple temperatures. .. warning:: Not well tested ... so be carefull. Parameters ---------- filename : str The name/path of the file to read fmt : str If `None` format is the one from ProDiMo. if `csv` it is written in a csv mode. ''' fw=open(filename,"w+") spfmt="{:8s}" if fmt=="csv": for reac in self.reactions: fw.write("{:5d}".format(reac.id).strip()) fw.write(",") fw.write(reac.type.strip()) fw.write(",") for i in range(3): if i<len(reac.reactants): fw.write(reac.reactants[i].strip()) else: fw.write("") fw.write(",") for i in range(4): if i<len(reac.products): fw.write(reac.products[i].strip()) else: fw.write("") fw.write(",") # assume that it is always positiv fw.write("{:8.2E}".format(reac.coeffs[0]).strip()) fw.write(",") # can also be negative fw.write("{:+8.5F}".format(reac.coeffs[1]).strip()) fw.write(",") # assume that it is always positiv fw.write("{:12.4f}".format(reac.coeffs[2]).strip()) fw.write(",") fw.write(reac.clem.strip()) for i in range(2): fw.write(",") fw.write("{:6.0f}".format(reac.temps[i]).strip()) fw.write(",") fw.write(reac.accuracy.strip()) fw.write(",") fw.write(reac.comment.strip()) fw.write("\n") else: # abcformat="{:+8.2E}" for reac in self.reactions: fw.write("{:5d}".format(reac.id)) fw.write(" ") # Three reactants for i in range(3): if i<len(reac.reactants): fw.write(spfmt.format(reac.reactants[i])) else: fw.write(spfmt.format(" ")) for i in range(4): if i<len(reac.products): fw.write(spfmt.format(reac.products[i])) else: fw.write(spfmt.format(" ")) # fw.write(" ") # assume that it is always positiv fw.write("{:8.2E}".format(reac.coeffs[0])) fw.write(" ") # the B coefficient # Check for numbers that don't comply to the format dig=str(reac._coeff2).split(".") ndig=2 if (len(dig)>1): ndig=len(dig[1]) # can also be negative if reac.coeffs[1]<0.0: fmt="{:+5.2F}" else: fmt="{:5.2F}" if ndig<3: fw.write(fmt.format(reac.coeffs[1])) else: # print(reac,reac._coeff2) fw.write(str(reac._coeff2)) fw.write(" ") # if the B coefficient has more digits than 5 we have a problem # simple make the C coeff less long, like one would need to do if it is written by hand lenstr2=len(str(reac._coeff2)) lenstr3=11 if lenstr2>5: lenstr3=lenstr3-(lenstr2-5) # print(lenstr3) fmt="{:"+str(lenstr3).strip()+".4f}" # assume that it is always positiv fw.write(fmt.format(reac.coeffs[2])) fw.write(" ") fw.write(reac.comment) fw.write("\n") fw.close()
[docs] def load_rates(self,filename=None): print("Not usefull for Reactions.in")