Source code for processing_components.skycomponent.operations

"""Function to manage sky components.

"""

import collections
import logging
from typing import Union, List

import astropy.units as u
import numpy
from astropy.convolution import Gaussian2DKernel
from astropy.coordinates import SkyCoord
from astropy.coordinates import match_coordinates_sky
from astropy.stats import gaussian_fwhm_to_sigma
from astropy.wcs.utils import pixel_to_skycoord
from astropy.wcs.utils import skycoord_to_pixel
from photutils import segmentation
from scipy import interpolate
from scipy.spatial import Voronoi

from data_models.memory_data_models import Image, Skycomponent, assert_same_chan_pol
from data_models.polarisation import PolarisationFrame
from processing_library.image.operations import create_image_from_array
from processing_library.util.array_functions import insert_function_sinc, insert_function_L, \
    insert_function_pswf, insert_array

log = logging.getLogger(__name__)


[docs]def create_skycomponent(direction: SkyCoord, flux: numpy.array, frequency: numpy.array, shape: str = 'Point', polarisation_frame=PolarisationFrame("stokesIQUV"), params: dict = None, name: str = '') \ -> Skycomponent: """ A single Skycomponent with direction, flux, shape, and params for the shape :param polarisation_frame: :param params: :param direction: :param flux: :param frequency: :param shape: 'Point' or 'Gaussian' :param name: :return: Skycomponent """ return Skycomponent( direction=direction, frequency=frequency, name=name, flux=numpy.array(flux), shape=shape, params=params, polarisation_frame=polarisation_frame)
[docs]def find_nearest_skycomponent_index(home, comps) -> int: """ Find nearest component in a list to a given direction (home) :param home: Home direction :param comps: list of skycomponents :return: index of best in comps """ catalog = SkyCoord(ra=[c.direction.ra for c in comps], dec=[c.direction.dec for c in comps]) idx, dist2d, dist3d = match_coordinates_sky(home, catalog) return idx
[docs]def find_nearest_skycomponent(home: SkyCoord, comps) -> (Skycomponent, float): """ Find nearest component to a given direction :param home: Home direction :param comps: list of skycomponents :return: Index of nearest component """ best_index = find_nearest_skycomponent_index(home, comps) best = comps[best_index] return best, best.direction.separation(home).rad
[docs]def find_separation_skycomponents(comps_test, comps_ref=None): """ Find the matrix of separations for two lists of components :param comps_test: List of components to be test :param comps_ref: If None then set to comps_test :return: """ if comps_ref is None: ncomps = len(comps_test) distances = numpy.zeros([ncomps, ncomps]) for i in range(ncomps): for j in range(i + 1, ncomps): distances[i, j] = comps_test[i].direction.separation(comps_test[j].direction).rad distances[j, i] = distances[i, j] return distances else: ncomps_ref = len(comps_ref) ncomps_test = len(comps_test) separations = numpy.zeros([ncomps_ref, ncomps_test]) for ref in range(ncomps_ref): for test in range(ncomps_test): separations[ref, test] = comps_test[test].direction.separation(comps_ref[ref].direction).rad return separations
[docs]def find_skycomponent_matches_atomic(comps_test, comps_ref, tol=1e-7): """ Match a list of candidates to a reference set of skycomponents find_skycomponent_matches is faster since it uses the astropy catalog matching many to one is allowed. :param tol: :param comps_test: :param comps_ref: :return: """ separations = find_separation_skycomponents(comps_test, comps_ref) matches = [] for test, comp_test in enumerate(comps_test): best = numpy.argmin(separations[:, test]) best_sep = separations[best, test] if best_sep < tol: matches.append((test, best, best_sep)) assert len(matches) <= len(comps_test) return matches
[docs]def find_skycomponent_matches(comps_test, comps_ref, tol=1e-7): """ Match a list of candidates to a reference set of skycomponents many to one is allowed. :param comps_test: :param comps_ref: :param tol: Tolerance in radians for a match :return: """ catalog_test = SkyCoord(ra=[c.direction.ra for c in comps_test], dec=[c.direction.dec for c in comps_test]) catalog_ref = SkyCoord(ra=[c.direction.ra for c in comps_ref], dec=[c.direction.dec for c in comps_ref]) idx, dist2d, dist3d = match_coordinates_sky(catalog_test, catalog_ref) matches = list() for test, comp_test in enumerate(comps_test): best = idx[test] best_sep = dist2d[test].rad if best_sep < tol: matches.append((test, best, best_sep)) return matches
[docs]def select_components_by_separation(home, comps, rmax=2 * numpy.pi, rmin=0.0) -> [Skycomponent]: """ Select components with a range in separation :param home: Home direction :param comps: list of skycomponents :param rmin: minimum range :param rmax: maximum range :return: selected components """ selected = list() for comp in comps: thissep = comp.direction.separation(home).rad if rmin <= thissep <= rmax: selected.append(comp) return selected
[docs]def select_neighbouring_components(comps, target_comps): """ Assign components to nearest in the target :param comps: :param target_comps: :return: Indices of components in target_comps """ target_catalog = SkyCoord([c.direction.ra.rad for c in target_comps] * u.rad, [c.direction.dec.rad for c in target_comps] * u.rad) all_catalog = SkyCoord([c.direction.ra.rad for c in comps] * u.rad, [c.direction.dec.rad for c in comps] * u.rad) from astropy.coordinates import match_coordinates_sky idx, d2d, d3d = match_coordinates_sky(all_catalog, target_catalog) return idx, d2d
[docs]def remove_neighbouring_components(comps, distance): """ Remove the faintest of a pair of components that are within a specified distance :param comps: :param target_comps: :param distance: Minimum distance :return: Indices of components in target_comps, selected components """ ncomps = len(comps) ok = ncomps * [True] for i in range(ncomps): if ok[i]: for j in range(i+1, ncomps): if ok[j]: d = comps[i].direction.separation(comps[j].direction).rad if d < distance: if numpy.max(comps[i].flux) > numpy.max(comps[j].flux): ok[j] = False else: ok[i] = False break from itertools import compress idx = list(compress(list(range(ncomps)), ok)) comps_sel = list(compress(comps, ok)) return idx, comps_sel
[docs]def find_skycomponents(im: Image, fwhm=1.0, threshold=1.0, npixels=5) -> List[Skycomponent]: """ Find gaussian components in Image above a certain threshold as Skycomponent :param im: Image to be searched :param fwhm: Full width half maximum of gaussian in pixels :param threshold: Threshold for component detection. Default: 1 Jy. :param npixels: Number of connected pixels required :return: list of sky components """ assert isinstance(im, Image) log.info("find_skycomponents: Finding components in Image by segmentation") # We use photutils segmentation - this first segments the image # into pieces that are thought to contain individual sources, then # identifies the concrete source properties. Having these two # steps makes it straightforward to extract polarisation and # spectral information. # Make filter kernel sigma = fwhm * gaussian_fwhm_to_sigma kernel = Gaussian2DKernel(sigma, x_size=int(1.5 * fwhm), y_size=int(1.5 * fwhm)) kernel.normalize() # Segment the average over all channels of Stokes I image_sum = numpy.sum(im.data, axis=0)[0, ...] / float(im.shape[0]) segments = segmentation.detect_sources(image_sum, threshold, npixels=npixels, filter_kernel=kernel) log.info("find_skycomponents: Identified %d segments" % segments.nlabels) # Now compute source properties for all polarisations and frequencies comp_tbl = [[segmentation.source_properties(im.data[chan, pol], segments, filter_kernel=kernel, wcs=im.wcs.sub([1,2])).to_table() for pol in [0]] for chan in range(im.nchan)] def comp_prop(comp, prop_name): return [[comp_tbl[chan][pol][comp][prop_name] for pol in [0]] for chan in range(im.nchan)] # Generate components comps = [] for segment in range(segments.nlabels): # Get flux and position. Astropy's quantities make this # unnecessarily complicated. flux = numpy.array(comp_prop(segment, "max_value")) # These values seem inconsistent with the xcentroid, and ycentroid values # ras = u.Quantity(list(map(u.Quantity, # comp_prop(segment, "ra_icrs_centroid")))) # decs = u.Quantity(list(map(u.Quantity, # comp_prop(segment, "dec_icrs_centroid")))) xs = u.Quantity(list(map(u.Quantity, comp_prop(segment, "xcentroid")))) ys = u.Quantity(list(map(u.Quantity, comp_prop(segment, "ycentroid")))) sc = pixel_to_skycoord(xs, ys, im.wcs, 0) ras = sc.ra decs = sc.dec # Remove NaNs from RA/DEC (happens if there is no flux in that # polarsiation/channel) # ras[numpy.isnan(ras)] = 0.0 # decs[numpy.isnan(decs)] = 0.0 # Determine "true" position by weighting flux_sum = numpy.sum(flux) ra = numpy.sum(flux * ras) / flux_sum dec = numpy.sum(flux * decs) / flux_sum xs = numpy.sum(flux * xs) / flux_sum ys = numpy.sum(flux * ys) / flux_sum point_flux = im.data[:, :, numpy.round(ys.value).astype('int'), numpy.round(xs.value).astype('int')] # Add component comps.append(Skycomponent( direction=SkyCoord(ra=ra, dec=dec), frequency=im.frequency, name="Segment %d" % segment, flux=point_flux, shape='Point', polarisation_frame=im.polarisation_frame, params={})) return comps
[docs]def apply_beam_to_skycomponent(sc: Union[Skycomponent, List[Skycomponent]], beam: Image) \ -> Union[Skycomponent, List[Skycomponent]]: """ Insert a Skycomponent into an image :param beam: :param sc: SkyComponent or list of SkyComponents :return: List of skycomponents """ assert isinstance(beam, Image) single = not isinstance(sc, collections.Iterable) if single: sc = [sc] nchan, npol, ny, nx = beam.shape log.debug('apply_beam_to_skycomponent: Processing %d components' % (len(sc))) ras = [comp.direction.ra.radian for comp in sc] decs = [comp.direction.dec.radian for comp in sc] skycoords = SkyCoord(ras * u.rad, decs * u.rad, frame='icrs') pixlocs = skycoord_to_pixel(skycoords, beam.wcs, origin=1, mode='wcs') newsc = [] total_flux = numpy.zeros([nchan, npol]) for icomp, comp in enumerate(sc): assert comp.shape == 'Point', "Cannot handle shape %s" % comp.shape assert_same_chan_pol(beam, comp) pixloc = (pixlocs[0][icomp], pixlocs[1][icomp]) if not numpy.isnan(pixloc).any(): x, y = int(round(float(pixloc[0]))), int(round(float(pixloc[1]))) if 0 <= x < nx and 0 <= y < ny: comp.flux[:, :] *= beam.data[:, :, y, x] total_flux += comp.flux newsc.append(Skycomponent(comp.direction, comp.frequency, comp.name, comp.flux, shape=comp.shape, polarisation_frame=comp.polarisation_frame)) log.debug('apply_beam_to_skycomponent: %d components with total flux %s' % (len(newsc), total_flux)) if single: return newsc[0] else: return newsc
[docs]def filter_skycomponents_by_index(sc, indices): """Filter sky components by index :param sc: :param indices: :return: """ newcomps = list() for i in indices: newcomps.append(sc[i]) return newcomps
[docs]def filter_skycomponents_by_flux(sc, flux_min=-numpy.inf, flux_max=numpy.inf): """Filter sky components by stokes I flux :param sc: :param flux_min: :param flux_max: :return: """ newcomps = list() for comp in sc: if (numpy.max(comp.flux[:, 0]) > flux_min) and (numpy.max(comp.flux[:, 0]) < flux_max): newcomps.append(comp) return newcomps
[docs]def insert_skycomponent(im: Image, sc: Union[Skycomponent, List[Skycomponent]], insert_method='Nearest', bandwidth=1.0, support=8) -> Image: """ Insert a Skycomponent into an image :param im: :param sc: SkyComponent or list of SkyComponents :param insert_method: '' | 'Sinc' | 'Lanczos' :param bandwidth: Fractional of uv plane to optimise over (1.0) :param support: Support of kernel (7) :return: image """ assert isinstance(im, Image) support = int(support / bandwidth) nchan, npol, ny, nx = im.data.shape if not isinstance(sc, collections.Iterable): sc = [sc] log.debug("insert_skycomponent: Using insert method %s" % insert_method) image_frequency = im.frequency ras = [comp.direction.ra.radian for comp in sc] decs = [comp.direction.dec.radian for comp in sc] skycoords = SkyCoord(ras * u.rad, decs * u.rad, frame='icrs') pixlocs = skycoord_to_pixel(skycoords, im.wcs, origin=0, mode='wcs') for icomp, comp in enumerate(sc): assert comp.shape == 'Point', "Cannot handle shape %s" % comp.shape assert_same_chan_pol(im, comp) pixloc = (pixlocs[0][icomp], pixlocs[1][icomp]) flux = numpy.zeros([nchan, npol]) if comp.flux.shape[0] > 1: for pol in range(npol): fint = interpolate.interp1d(comp.frequency, comp.flux[:, pol], kind="cubic") flux[:, pol] = fint(image_frequency) else: flux = comp.flux if insert_method == "Lanczos": insert_array(im.data, pixloc[0], pixloc[1], flux, bandwidth, support, insert_function=insert_function_L) elif insert_method == "Sinc": insert_array(im.data, pixloc[0], pixloc[1], flux, bandwidth, support, insert_function=insert_function_sinc) elif insert_method == "PSWF": insert_array(im.data, pixloc[0], pixloc[1], flux, bandwidth, support, insert_function=insert_function_pswf) else: insert_method = 'Nearest' y, x = numpy.round(pixloc[1]).astype('int'), numpy.round(pixloc[0]).astype('int') if 0 <= x < nx and 0 <= y < ny: im.data[:, :, y, x] += flux[...] return im
[docs]def voronoi_decomposition(im, comps): """Construct a Voronoi decomposition of a set of components The array return contains the index into the Voronoi structure :param im: :param comps: :return: Voronoi structure, vertex image """ def voronoi_vertex(vy, vx, vertex_y, vertex_x): """ Return the nearest Voronoi vertex :param vy: :param vx: :param vertex_y: :param vertex_x: :return: """ return numpy.argmin(numpy.hypot(vy - vertex_y, vx - vertex_x)) directions = SkyCoord([u.rad * c.direction.ra.rad for c in comps], [u.rad * c.direction.dec.rad for c in comps]) x, y = skycoord_to_pixel(directions, im.wcs, 0, 'wcs') points = [(x[i], y[i]) for i, _ in enumerate(x)] vor = Voronoi(points) nchan, npol, ny, nx = im.shape vertex_image = numpy.zeros([ny, nx]).astype('int') for j in range(ny): for i in range(nx): vertex_image[j, i] = voronoi_vertex(j, i, vor.points[:, 1], vor.points[:, 0]) return vor, vertex_image
[docs]def image_voronoi_iter(im: Image, components: Skycomponent) -> collections.Iterable: """Iterate through Voronoi decomposition, returning a generator yielding fullsize images :param im: Image :param components: Components to define Voronoi decomposition """ if len(components) == 1: mask = numpy.ones(im.data.shape) yield create_image_from_array(mask, wcs=im.wcs, polarisation_frame=im.polarisation_frame) else: vor, vertex_array = voronoi_decomposition(im, components) nregions = numpy.max(vertex_array) + 1 for region in range(nregions): mask = numpy.zeros(im.data.shape) mask[(vertex_array == region)[numpy.newaxis, numpy.newaxis,...]] = 1.0 yield create_image_from_array(mask, wcs=im.wcs, polarisation_frame=im.polarisation_frame)
[docs]def partition_skycomponent_neighbours(comps, targets): """ Partition sky components by nearest target source :param comps: :param targets: :return: """ idx, d2d = select_neighbouring_components(comps, targets) from itertools import compress comps_lists = list() for comp_id in numpy.unique(idx): selected_comps = list(compress(comps, idx == comp_id)) comps_lists.append(selected_comps) return comps_lists