from __future__ import print_function
import os
import glob
import numpy as np
from astropy.io import fits
from astropy import units as u
from astropy.utils.console import ProgressBar
import dust_emissivity
from . import higal_beams
from .parallel_map import parallel_map
__all__ = ['PixelFitter',
'fit_modified_blackbody_tofiles',
'fit_modified_blackbody_to_imagecube']
[docs]class PixelFitter(object):
def __init__(self, tguess=20, bguess=1.75, nguess=1e22,
trange=[2.73,50], brange=[1,3], nrange=[1e20,1e25],
tfixed=False, bfixed=False, nfixed=False):
"""
Initialize an SED fitter instance with a set of guesses.
The input parameters follow a template that is the same for each
of temperature, beta, and column.
Once initialized, `PixelFitter` can be called as a function of
frequency (Hz), flux (MJy), and error (MJy)
Parameters
----------
guess : float
The guessed value for T,N, or beta. Temperature in Kelvin,
column in :math:`cm^{-2}`
range : tuple or list of length 2
The minimum/maximum values of each parameter
fixed : bool
Is the parameter fixed at the guessed value?
"""
import lmfit
from collections import OrderedDict
parlist = [(n,lmfit.Parameter(name=n,value=x))
for n,x in zip(('T','beta','N'),
(tguess, bguess, nguess))]
parameters = lmfit.Parameters()
parameters.update(OrderedDict(parlist))
assert parameters.keys()[0] == 'T'
assert parameters.keys()[1] == 'beta'
assert parameters.keys()[2] == 'N'
parameters['beta'].vary = not bfixed
parameters['beta'].min = brange[0]
parameters['beta'].max = brange[1]
parameters['T'].vary = not tfixed
parameters['T'].min = trange[0]
parameters['T'].max = trange[1]
parameters['N'].vary = not nfixed
parameters['N'].min = nrange[0]
parameters['N'].max = nrange[1]
self.parameters = parameters
[docs] def __call__(self, frequency, flux, err):
"""
Perform the fit and return a tuple of values & errors
Parameters
----------
frequency : `~astropy.units.quantity.Quantity`
An array of frequencies to be converted to Hz and passed to the
fitter
flux : `~astropy.units.quantity.Quantity`
An array of `u.MJy` equivalent flux values
err : `~astropy.units.quantity.Quantity`
An array of `u.MJy` equivalent flux values that specify the errors
on flux
"""
fitter = dust_emissivity.fit_sed.fit_sed_lmfit_hz
lm = fitter(frequency.to(u.Hz, u.spectral()).value,
flux.to(u.erg/u.cm**2/u.s/u.Hz).value,
err=err.to(u.erg/u.cm**2/u.s/u.Hz).value,
guesses=self.parameters,
blackbody_function='modified')
self.lm = lm
self.vals = (lm.params['T'].value, lm.params['beta'].value,
lm.params['N'].value)
self.errs = (lm.params['T'].stderr, lm.params['beta'].stderr,
lm.params['N'].stderr)
return self.vals,self.errs
[docs] def integral(self, fmin, fmax):
"""
Compute the integral of the currently-fitted parameters
Parameters
----------
fmin, fmax : `~astropy.units.quantity.Quantity`
Frequency-equivalent start and end points for the integral
Returns
-------
The SED integrated in units of
``bbunit = u.erg/u.s/u.cm**2``
"""
integrator = dust_emissivity.blackbody.integrate_sed
return integrator(fmin, fmax,
function=dust_emissivity.blackbody.modified_blackbody,
temperature=self.lm.params['T'].value*u.K,
column=self.lm.params['N'].value*u.cm**-2,
beta=self.lm.params['beta'].value).value
[docs]def fit_modified_blackbody_tofiles(filename_template,
wavelengths=[70,160,250,350,500],
bad_value=0,
name_to_um=higal_beams.name_to_um,
**kwargs
):
"""
Fit a modified blackbody to each pixel in a set of files identified using a
filename template
Parameters
----------
filename_template : str
A filename template that accepts the HiGal wavelength strings, e.g.:
``destripe_l000_{0}_reg.fits``
would format to
``destripe_l000_PMW_reg.fits``
Wildcards are allowed, so you can also do
``HIGAL*_{0}_RM_smregrid45.fits``
wavelengths : list
The wavelengths, in microns, to include in the fit
bad_value : float
A value to mark as bad and ignore. Some files have NaNs indicating bad
points, others have zeros - this is to account for bad pixels that have
values of zero
name_to_um : dict
A dictionary identifying the translation between the string that will
be inserted into the file template and the wavelength. There are two
built in: `higal_sedfitter.higal_beams.name_to_um` and
`higal_sedfitter.higal_beams.num_to_um`.
kwargs : dict
passed to `~higal_sedfitter.fit.fit_modified_blackbody_to_imagecube`
"""
target_files = {name_to_um[x]: filename_template.format(x)
for x in name_to_um}
wavelengths_sorted = sorted(wavelengths)
for k in target_files:
if not os.path.exists(target_files[k]):
G = glob.glob(target_files[k])
if len(G) == 1:
target_files[k] = G[0]
else:
raise IOError("File {0} does not exist".format(target_files[k]))
# Make an image cube appropriately sorted by wavelength
image_cube = np.array([fits.getdata(target_files[wl])
for wl in wavelengths_sorted])
image_cube[image_cube == bad_value] = np.nan
# python 3 doesn't share namespace between loops & main code
wl = wavelengths_sorted[-1]
# wl should be defined from above; the headers should (in principle) be
# identical
outheader = fits.getheader(target_files[wl])
return fit_modified_blackbody_to_imagecube(image_cube, outheader,
wavelengths, **kwargs)
[docs]def fit_modified_blackbody_to_imagecube(image_cube,
outheader,
wavelengths=[70,160,250,350,500],
error_scaling=0.2,
error_cube=None,
pixelfitter=None, ncores=4,
clobber=True,
integral=False, out_prefix="",):
"""
Fit a modified blackbody to each pixel in an image cube. Writes the
results to files of the form ``{out_prefix}+T.fits``, ``{out_prefix}+beta.fits``,
``{out_prefix}+N.fits``, and optionally ``{out_prefix}+integral.fits``.
Parameters
----------
image_cube : `~numpy.ndarray`
A cube constructed from the individual wavelengths of the Herschel
image. Should have units of MJy (not MJy/sr).
wavelengths : list
The wavelengths, in microns, to include in the fit
error_scaling : float or None
The amount to scale the input fluxes by to determine the errors
error_cube : None or `~numpy.ndarray`
Alternative to ``error_scaling``. A cube of errors the same size as
the image_cube.
pixelfitter : :class:`PixelFitter` or None
An instance of the :class:`PixelFitter` class to use for the fitting
(this is how guesses are specified). If None, will use defaults.
ncores : int
OPTIONAL / NOT PRESENTLY IMPLEMENTED
Allows parallelization
clobber : bool
Overwrite existing output files?
integral : bool
Also include the integral of the modified blackbody, e.g. for
luminosity determination? This increases the execution time by
a factor of 2
out_prefix : str
A prefix to prepend to the output file names
Returns
-------
t_hdu,b_hdu,n_hdu : :class:`~astropy.io.fits.HDUList`
HDUlists incorporating the best-fit values as the
`~astropy.io.fits.PrimaryHDU` image and the errors
as `~astropy.io.fits.ImageHDU` s in the first (ERROR)
extension
int_hdu : :class:`~astropy.io.fits.PrimaryHDU`
(optional) An HDU containing an image of the integral
in :math:`erg/s/cm^2`
"""
if pixelfitter is None:
pixelfitter = PixelFitter()
wavelengths_sorted = sorted(wavelengths)
# Only fit pixels with no NaNs
ok_to_fit = (np.isnan(image_cube)).max(axis=0) == 0
okcount = np.count_nonzero(ok_to_fit)
if okcount == 0:
raise ValueError("No valid pixels found.")
okx,oky = np.where(ok_to_fit)
frequencies = u.Quantity(wavelengths_sorted,u.um).to(u.Hz, u.spectral())
timg, bimg, nimg = [np.empty(ok_to_fit.shape)+np.nan for ii in range(3)]
terr, berr, nerr = [np.empty(ok_to_fit.shape)+np.nan for ii in range(3)]
if integral:
intimg = np.empty(ok_to_fit.shape)+np.nan
pb = ProgressBar(okcount)
def fitter(xy):
x,y = xy
if error_cube is not None:
error_spec = error_cube[:,x,y]
else:
error_spec = image_cube[:,x,y]*error_scaling
vals,errs = pixelfitter(frequencies,
u.Quantity(image_cube[:, x, y], u.MJy),
u.Quantity(error_spec, u.MJy),
)
timg[x,y] = vals[0]
bimg[x,y] = vals[1]
nimg[x,y] = vals[2]
terr[x,y] = errs[0]
berr[x,y] = errs[1]
nerr[x,y] = errs[2]
if integral:
integ = pixelfitter.integral(1*u.cm, 1*u.um)
vals = vals + (integ,)
intimg[x,y] = integ
pb.update()
return vals,errs,xy
if ncores > 1:
result = parallel_map(fitter, zip(okx,oky), numcores=ncores)
# need to unpack results now
for vals,errs,xy in result:
x,y = xy
timg[x,y] = vals[0]
bimg[x,y] = vals[1]
nimg[x,y] = vals[2]
terr[x,y] = errs[0]
berr[x,y] = errs[1]
nerr[x,y] = errs[2]
if integral:
intimg[x,y] = vals[3]
else:
for xy in zip(okx,oky):
fitter(xy)
t_hdu = fits.HDUList([fits.PrimaryHDU(data=timg, header=outheader),
fits.ImageHDU(data=terr, header=outheader, name='ERROR')])
t_hdu[0].header['BUNIT'] = 'K'
t_hdu[1].header['BUNIT'] = 'K'
b_hdu = fits.HDUList([fits.PrimaryHDU(data=bimg, header=outheader),
fits.ImageHDU(data=berr, header=outheader, name='ERROR')])
b_hdu[0].header['BUNIT'] = ''
b_hdu[1].header['BUNIT'] = ''
n_hdu = fits.HDUList([fits.PrimaryHDU(data=nimg, header=outheader),
fits.ImageHDU(data=nerr, header=outheader, name='ERROR')])
n_hdu[0].header['BUNIT'] = 'cm^(-2)'
n_hdu[1].header['BUNIT'] = 'cm^(-2)'
t_hdu.writeto(out_prefix+'T.fits', clobber=clobber)
b_hdu.writeto(out_prefix+'beta.fits', clobber=clobber)
n_hdu.writeto(out_prefix+'N.fits', clobber=clobber)
if integral:
int_hdu = fits.PrimaryHDU(data=intimg, header=outheader)
int_hdu.header['BUNIT'] = 'erg*s^(-1)*cm^(-2)'
int_hdu.writeto(out_prefix+'integral.fits', clobber=clobber)
return t_hdu,b_hdu,n_hdu,int_hdu
else:
return t_hdu,b_hdu,n_hdu