Source code for processing_components.image.deconvolution

""" Image deconvolution functions

The standard deconvolution algorithms are provided:

    hogbom: Hogbom CLEAN See: Hogbom CLEAN A&A Suppl, 15, 417, (1974)
    
    msclean: MultiScale CLEAN See: Cornwell, T.J., Multiscale CLEAN (IEEE Journal of Selected Topics in Sig Proc,
    2008 vol. 2 pp. 793-801)

    mfsmsclean: MultiScale Multi-Frequency See: U. Rau and T. J. Cornwell, “A multi-scale multi-frequency
    deconvolution algorithm for synthesis imaging in radio interferometry,” A&A 532, A71 (2011).

For example to make dirty image and PSF, deconvolve, and then restore::

    model = create_image_from_visibility(vt, cellsize=0.001, npixel=256)
    dirty, sumwt = invert_2d(vt, model)
    psf, sumwt = invert_2d(vt, model, dopsf=True)

    comp, residual = deconvolve_cube(dirty, psf, niter=1000, threshold=0.001, fracthresh=0.01, window='quarter',
                                 gain=0.7, algorithm='msclean', scales=[0, 3, 10, 30])

    restored = restore_cube(comp, psf, residual)

"""

import logging

from data_models.polarisation import PolarisationFrame

import numpy
from astropy.convolution import Gaussian2DKernel, convolve_fft
from photutils import fit_2dgaussian

from data_models.memory_data_models import Image
from data_models.parameters import get_parameter
from processing_library.arrays.cleaners import hogbom, hogbom_complex, msclean, msmfsclean
from processing_library.image.operations import create_image_from_array, copy_image
from ..image.operations import calculate_image_frequency_moments, calculate_image_from_frequency_moments

log = logging.getLogger(__name__)


[docs]def deconvolve_cube(dirty: Image, psf: Image, prefix='', **kwargs) -> (Image, Image): """ Clean using a variety of algorithms Functions that clean a dirty image using a point spread function. The algorithms available are: hogbom: Hogbom CLEAN See: Hogbom CLEAN A&A Suppl, 15, 417, (1974) msclean: MultiScale CLEAN See: Cornwell, T.J., Multiscale CLEAN (IEEE Journal of Selected Topics in Sig Proc, 2008 vol. 2 pp. 793-801) mfsmsclean, msmfsclean, mmclean: MultiScale Multi-Frequency See: U. Rau and T. J. Cornwell, “A multi-scale multi-frequency deconvolution algorithm for synthesis imaging in radio interferometry,” A&A 532, A71 (2011). For example:: comp, residual = deconvolve_cube(dirty, psf, niter=1000, gain=0.7, algorithm='msclean', scales=[0, 3, 10, 30], threshold=0.01) For the MFS clean, the psf must have number of channels >= 2 * nmoment :param dirty: Image dirty image :param psf: Image Point Spread Function :param window_shape: Window image (Bool) - clean where True :param mask: Window in the form of an image, overrides woindow_shape :param algorithm: Cleaning algorithm: 'msclean'|'hogbom'|'mfsmsclean' :param gain: loop gain (float) 0.7 :param threshold: Clean threshold (0.0) :param fractional_threshold: Fractional threshold (0.01) :param scales: Scales (in pixels) for multiscale ([0, 3, 10, 30]) :param nmoment: Number of frequency moments (default 3) :param findpeak: Method of finding peak in mfsclean: 'Algorithm1'|'ASKAPSoft'|'CASA'|'ARL', Default is ARL. :return: componentimage, residual """ assert isinstance(dirty, Image), dirty assert isinstance(psf, Image), psf window_shape = get_parameter(kwargs, 'window_shape', None) if window_shape == 'quarter': log.info("deconvolve_cube %s: window is inner quarter" % prefix) qx = dirty.shape[3] // 4 qy = dirty.shape[2] // 4 window = numpy.zeros_like(dirty.data) window[..., (qy + 1):3 * qy, (qx + 1):3 * qx] = 1.0 log.info('deconvolve_cube %s: Cleaning inner quarter of each sky plane' % prefix) elif window_shape == 'no_edge': edge = get_parameter(kwargs, 'window_edge', 16) nx = dirty.shape[3] ny = dirty.shape[2] window = numpy.zeros_like(dirty.data) window[..., (edge + 1):(ny - edge), (edge + 1):(nx - edge)] = 1.0 log.info('deconvolve_cube %s: Window omits %d-pixel edge of each sky plane' % (prefix, edge)) elif window_shape is None: log.info("deconvolve_cube %s: Cleaning entire image" % prefix) window = None else: raise ValueError("Window shape %s is not recognized" % window_shape) mask = get_parameter(kwargs, 'mask', None) if isinstance(mask, Image): if window is not None: log.warning('deconvolve_cube %s: Overriding window_shape with mask image' % (prefix)) window = mask.data psf_support = get_parameter(kwargs, 'psf_support', max(dirty.shape[2] // 2, dirty.shape[3] // 2)) if (psf_support <= psf.shape[2] // 2) and ((psf_support <= psf.shape[3] // 2)): centre = [psf.shape[2] // 2, psf.shape[3] // 2] psf.data = psf.data[..., (centre[0] - psf_support):(centre[0] + psf_support), (centre[1] - psf_support):(centre[1] + psf_support)] log.info('deconvolve_cube %s: PSF support = +/- %d pixels' % (prefix, psf_support)) log.info('deconvolve_cube %s: PSF shape %s' % (prefix, str(psf.data.shape))) algorithm = get_parameter(kwargs, 'algorithm', 'msclean') if algorithm == 'msclean': log.info("deconvolve_cube %s: Multi-scale clean of each polarisation and channel separately" % prefix) gain = get_parameter(kwargs, 'gain', 0.7) assert 0.0 < gain < 2.0, "Loop gain must be between 0 and 2" thresh = get_parameter(kwargs, 'threshold', 0.0) assert thresh >= 0.0 niter = get_parameter(kwargs, 'niter', 100) assert niter > 0 scales = get_parameter(kwargs, 'scales', [0, 3, 10, 30]) fracthresh = get_parameter(kwargs, 'fractional_threshold', 0.01) assert 0.0 < fracthresh < 1.0 comp_array = numpy.zeros_like(dirty.data) residual_array = numpy.zeros_like(dirty.data) for channel in range(dirty.data.shape[0]): for pol in range(dirty.data.shape[1]): if psf.data[channel, pol, :, :].max(): log.info("deconvolve_cube %s: Processing pol %d, channel %d" % (prefix, pol, channel)) if window is None: comp_array[channel, pol, :, :], residual_array[channel, pol, :, :] = \ msclean(dirty.data[channel, pol, :, :], psf.data[channel, pol, :, :], None, gain, thresh, niter, scales, fracthresh, prefix) else: comp_array[channel, pol, :, :], residual_array[channel, pol, :, :] = \ msclean(dirty.data[channel, pol, :, :], psf.data[channel, pol, :, :], window[channel, pol, :, :], gain, thresh, niter, scales, fracthresh, prefix) else: log.info("deconvolve_cube %s: Skipping pol %d, channel %d" % (prefix, pol, channel)) comp_image = create_image_from_array(comp_array, dirty.wcs, dirty.polarisation_frame) residual_image = create_image_from_array(residual_array, dirty.wcs, dirty.polarisation_frame) elif algorithm == 'msmfsclean' or algorithm == 'mfsmsclean' or algorithm == 'mmclean': findpeak = get_parameter(kwargs, "findpeak", 'ARL') log.info("deconvolve_cube %s: Multi-scale multi-frequency clean of each polarisation separately" % prefix) nmoment = get_parameter(kwargs, "nmoment", 3) assert nmoment >= 1, "Number of frequency moments must be greater than or equal to one" nchan = dirty.shape[0] assert nchan > 2 * (nmoment - 1), "Require nchan %d > 2 * (nmoment %d - 1)" % (nchan, 2 * (nmoment - 1)) dirty_taylor = calculate_image_frequency_moments(dirty, nmoment=nmoment) if nmoment > 1: psf_taylor = calculate_image_frequency_moments(psf, nmoment=2 * nmoment) else: psf_taylor = calculate_image_frequency_moments(psf, nmoment=1) psf_peak = numpy.max(psf_taylor.data) dirty_taylor.data /= psf_peak psf_taylor.data /= psf_peak log.info("deconvolve_cube %s: Shape of Dirty moments image %s" % (prefix, str(dirty_taylor.shape))) log.info("deconvolve_cube %s: Shape of PSF moments image %s" % (prefix, str(psf_taylor.shape))) gain = get_parameter(kwargs, 'gain', 0.7) assert 0.0 < gain < 2.0, "Loop gain must be between 0 and 2" thresh = get_parameter(kwargs, 'threshold', 0.0) assert thresh >= 0.0 niter = get_parameter(kwargs, 'niter', 100) assert niter > 0 scales = get_parameter(kwargs, 'scales', [0, 3, 10, 30]) fracthresh = get_parameter(kwargs, 'fractional_threshold', 0.1) assert 0.0 < fracthresh < 1.0 comp_array = numpy.zeros(dirty_taylor.data.shape) residual_array = numpy.zeros(dirty_taylor.data.shape) for pol in range(dirty_taylor.data.shape[1]): if psf_taylor.data[0, pol, :, :].max(): log.info("deconvolve_cube %s: Processing pol %d" % (prefix, pol)) if window is None: comp_array[:, pol, :, :], residual_array[:, pol, :, :] = \ msmfsclean(dirty_taylor.data[:, pol, :, :], psf_taylor.data[:, pol, :, :], None, gain, thresh, niter, scales, fracthresh, findpeak, prefix) else: log.info('deconvolve_cube %s: Clean window has %d valid pixels' % (prefix, int(numpy.sum(window[0,pol])))) comp_array[:, pol, :, :], residual_array[:, pol, :, :] = \ msmfsclean(dirty_taylor.data[:, pol, :, :], psf_taylor.data[:, pol, :, :], window[0, pol, :, :], gain, thresh, niter, scales, fracthresh, findpeak, prefix) else: log.info("deconvolve_cube %s: Skipping pol %d" % (prefix, pol)) comp_image = create_image_from_array(comp_array, dirty_taylor.wcs, dirty.polarisation_frame) residual_image = create_image_from_array(residual_array, dirty_taylor.wcs, dirty.polarisation_frame) return_moments = get_parameter(kwargs, "return_moments", False) if not return_moments: log.info("deconvolve_cube %s: calculating spectral cubes" % prefix) comp_image = calculate_image_from_frequency_moments(dirty, comp_image) residual_image = calculate_image_from_frequency_moments(dirty, residual_image) else: log.info("deconvolve_cube %s: constructed moment cubes" % prefix) elif algorithm == 'hogbom': log.info("deconvolve_cube %s: Hogbom clean of each polarisation and channel separately" % prefix) gain = get_parameter(kwargs, 'gain', 0.7) assert 0.0 < gain < 2.0, "Loop gain must be between 0 and 2" thresh = get_parameter(kwargs, 'threshold', 0.0) assert thresh >= 0.0 niter = get_parameter(kwargs, 'niter', 100) assert niter > 0 fracthresh = get_parameter(kwargs, 'fractional_threshold', 0.1) assert 0.0 < fracthresh < 1.0 comp_array = numpy.zeros(dirty.data.shape) residual_array = numpy.zeros(dirty.data.shape) for channel in range(dirty.data.shape[0]): for pol in range(dirty.data.shape[1]): if psf.data[channel, pol, :, :].max(): log.info("deconvolve_cube %s: Processing pol %d, channel %d" % (prefix, pol, channel)) if window is None: comp_array[channel, pol, :, :], residual_array[channel, pol, :, :] = \ hogbom(dirty.data[channel, pol, :, :], psf.data[channel, pol, :, :], None, gain, thresh, niter, fracthresh, prefix) else: comp_array[channel, pol, :, :], residual_array[channel, pol, :, :] = \ hogbom(dirty.data[channel, pol, :, :], psf.data[channel, pol, :, :], window[channel, pol, :, :], gain, thresh, niter, fracthresh, prefix) else: log.info("deconvolve_cube %s: Skipping pol %d, channel %d" % (prefix, pol, channel)) comp_image = create_image_from_array(comp_array, dirty.wcs, dirty.polarisation_frame) residual_image = create_image_from_array(residual_array, dirty.wcs, dirty.polarisation_frame) elif algorithm == 'hogbom-complex': log.info("deconvolve_cube_complex: Hogbom-complex clean of each polarisation and channel separately") gain = get_parameter(kwargs, 'gain', 0.7) assert 0.0 < gain < 2.0, "Loop gain must be between 0 and 2" thresh = get_parameter(kwargs, 'threshold', 0.0) assert thresh >= 0.0 niter = get_parameter(kwargs, 'niter', 100) assert niter > 0 fracthresh = get_parameter(kwargs, 'fractional_threshold', 0.1) assert 0.0 <= fracthresh < 1.0 comp_array = numpy.zeros(dirty.data.shape) residual_array = numpy.zeros(dirty.data.shape) for channel in range(dirty.data.shape[0]): for pol in range(dirty.data.shape[1]): if pol == 0 or pol == 3: if psf.data[channel, pol, :, :].max(): log.info("deconvolve_cube_complex: Processing pol %d, channel %d" % (pol, channel)) if window is None: comp_array[channel, pol, :, :], residual_array[channel, pol, :, :] = \ hogbom(dirty.data[channel, pol, :, :], psf.data[channel, pol, :, :], None, gain, thresh, niter, fracthresh) else: comp_array[channel, pol, :, :], residual_array[channel, pol, :, :] = \ hogbom(dirty.data[channel, pol, :, :], psf.data[channel, pol, :, :], window[channel, pol, :, :], gain, thresh, niter, fracthresh) else: log.info("deconvolve_cube_complex: Skipping pol %d, channel %d" % (pol, channel)) if pol == 1: if psf.data[channel, 1:2, :, :].max(): log.info("deconvolve_cube_complex: Processing pol 1 and 2, channel %d" % (channel)) if window is None: comp_array[channel, 1, :, :], comp_array[channel, 2, :, :], residual_array[channel, 1, :, :], residual_array[channel, 2, :, :] = hogbom_complex( dirty.data[channel, 1, :, :], dirty.data[channel, 2, :, :], psf.data[channel, 1, :, :], psf.data[channel, 2, :, :], None, gain, thresh, niter, fracthresh) else: comp_array[channel, 1, :, :], comp_array[channel, 2, :, :], residual_array[channel, 1, :, :], residual_array[channel, 2, :, :] = hogbom_complex( dirty.data[channel, 1, :, :], dirty.data[channel, 2, :, :], psf.data[channel, 1, :, :], psf.data[channel, 2, :, :], window[channel, pol, :, :], gain, thresh, niter, fracthresh) else: log.info("deconvolve_cube_complex: Skipping pol 1 and 2, channel %d" % (channel)) if pol == 2: continue comp_image = create_image_from_array(comp_array, dirty.wcs, polarisation_frame=PolarisationFrame('stokesIQUV')) residual_image = create_image_from_array(residual_array, dirty.wcs, polarisation_frame=PolarisationFrame('stokesIQUV')) else: raise ValueError('deconvolve_cube %s: Unknown algorithm %s' % (prefix, algorithm)) return comp_image, residual_image
[docs]def restore_cube(model: Image, psf: Image, residual=None, **kwargs) -> Image: """ Restore the model image to the residuals :params psf: Input PSF :return: restored image """ assert isinstance(model, Image), model assert isinstance(psf, Image), psf assert residual is None or isinstance(residual, Image), residual restored = copy_image(model) npixel = psf.data.shape[3] sl = slice(npixel // 2 - 7, npixel // 2 + 8) size = get_parameter(kwargs, "psfwidth", None) if size is None: # isotropic at the moment! from scipy.optimize import minpack try: fit = fit_2dgaussian(psf.data[0, 0, sl, sl]) if fit.x_stddev <= 0.0 or fit.y_stddev <= 0.0: log.debug('restore_cube: error in fitting to psf, using 1 pixel stddev') size = 1.0 else: size = max(fit.x_stddev, fit.y_stddev) log.debug('restore_cube: psfwidth = %s' % (size)) except minpack.error as err: log.debug('restore_cube: minpack error, using 1 pixel stddev') size = 1.0 except ValueError as err: log.debug('restore_cube: warning in fit to psf, using 1 pixel stddev') size = 1.0 else: log.debug('restore_cube: Using specified psfwidth = %s' % (size)) # By convention, we normalise the peak not the integral so this is the volume of the Gaussian norm = 2.0 * numpy.pi * size ** 2 gk = Gaussian2DKernel(size) for chan in range(model.shape[0]): for pol in range(model.shape[1]): restored.data[chan, pol, :, :] = norm * convolve_fft(model.data[chan, pol, :, :], gk, normalize_kernel=False, allow_huge=True) if residual is not None: restored.data += residual.data return restored