Source code for processing_components.visibility.coalesce

"""
Functions for visibility coalescence and decoalescence.

The BlockVisibility format describes the visibility
data_models as it would come from the correlator: [time, ant2, ant1, channel, pol]. This is well-suited to
calibration and some visibility processing such as continuum removal. However the BlockVisibility format
is vastly oversampled on the short spacings where the visibility (after calibration) varies slowly compared to
the longest baselines. The coalescence operation resamples the visibility at a rate inversely proportional
to baseline length. This cannot be held in the BlockVIsibility format so it is stored in the Visibility
format. For e.g. SKA1-LOW, coalescing typically reduces the number of visibilities by a factor between 10 and 30.

A typical use might be::

    vt = predict_skycomponent_visibility(vt, comps)
    cvt = convert_blockvisibility_to_visibility(vt, time_coal=1.0, max_time_coal=100, frequency_coal=0.0,
        max_frequency_coal=1)
    dirtyimage, sumwt = invert_2d(cvt, model)

"""

import numpy

from astropy import constants

from processing_library.util.array_functions import average_chunks, average_chunks2

from data_models.memory_data_models import Visibility, BlockVisibility
from data_models.parameters import get_parameter

from ..visibility.base import vis_summary, copy_visibility

import logging

log = logging.getLogger(__name__)


[docs]def coalesce_visibility(vis: BlockVisibility, **kwargs) -> Visibility: """ Coalesce the BlockVisibility data_models. The output format is a Visibility, as needed for imaging Coalesce by baseline-dependent averaging (optional). The number of integrations averaged goes as the ratio of the maximum possible baseline length to that for this baseline. This number can be scaled by coalescence_factor and limited by max_coalescence. When faceting, the coalescence factors should be roughly the same as the number of facets on one axis. If coalescence_factor=0.0 then just a format conversion is done :param vis: BlockVisibility to be coalesced :return: Coalesced visibility with cindex and blockvis filled in """ assert isinstance(vis, BlockVisibility), "vis is not a BlockVisibility: %r" % vis time_coal = get_parameter(kwargs, 'time_coal', 0.0) max_time_coal = get_parameter(kwargs, 'max_time_coal', 100) frequency_coal = get_parameter(kwargs, 'frequency_coal', 0.0) max_frequency_coal = get_parameter(kwargs, 'max_frequency_coal', 100) if time_coal == 0.0 and frequency_coal == 0.0: return convert_blockvisibility_to_visibility((vis)) cvis, cuvw, cwts, cimwt, ctime, cfrequency, cchannel_bandwidth, ca1, ca2, cintegration_time, cindex \ = average_in_blocks(vis.data['vis'], vis.data['uvw'], vis.data['weight'], vis.data['imaging_weight'], vis.time, vis.integration_time, vis.frequency, vis.channel_bandwidth, time_coal, max_time_coal, frequency_coal, max_frequency_coal) coalesced_vis = Visibility(uvw=cuvw, time=ctime, frequency=cfrequency, channel_bandwidth=cchannel_bandwidth, phasecentre=vis.phasecentre, antenna1=ca1, antenna2=ca2, vis=cvis, weight=cwts, imaging_weight=cimwt, configuration=vis.configuration, integration_time=cintegration_time, polarisation_frame=vis.polarisation_frame, cindex=cindex, blockvis=vis, meta=vis.meta) log.debug( 'coalesce_visibility: Created new Visibility for coalesced data_models, coalescence factors (t,f) = (%.3f,%.3f)' % (time_coal, frequency_coal)) log.debug('coalesce_visibility: Maximum coalescence (t,f) = (%d, %d)' % (max_time_coal, max_frequency_coal)) log.debug('coalesce_visibility: Original %s, coalesced %s' % (vis_summary(vis), vis_summary(coalesced_vis))) return coalesced_vis
[docs]def convert_blockvisibility_to_visibility(vis: BlockVisibility) -> Visibility: """ Convert the BlockVisibility data with no coalescence :param vis: BlockVisibility to be converted :return: Visibility with cindex and blockvis filled in """ assert isinstance(vis, BlockVisibility), "vis is not a BlockVisibility: %r" % vis cvis, cuvw, cwts, cimaging_wts, ctime, cfrequency, cchannel_bandwidth, ca1, ca2, cintegration_time, cindex \ = convert_blocks(vis.data['vis'], vis.data['uvw'], vis.data['weight'], vis.data['imaging_weight'], vis.time, vis.integration_time, vis.frequency, vis.channel_bandwidth) converted_vis = Visibility(uvw=cuvw, time=ctime, frequency=cfrequency, channel_bandwidth=cchannel_bandwidth, phasecentre=vis.phasecentre, antenna1=ca1, antenna2=ca2, vis=cvis, weight=cwts, imaging_weight=cimaging_wts, configuration=vis.configuration, integration_time=cintegration_time, polarisation_frame=vis.polarisation_frame, cindex=cindex, blockvis=vis, meta=vis.meta) log.debug('convert_visibility: Original %s, converted %s' % (vis_summary(vis), vis_summary(converted_vis))) return converted_vis
[docs]def decoalesce_visibility(vis: Visibility, **kwargs) -> BlockVisibility: """ Decoalesce the visibilities to the original values (opposite of coalesce_visibility) This relies upon the block vis and the index being part of the vis. Needs the index generated by coalesce_visibility :param vis: (Coalesced visibility) :return: BlockVisibility with vis and weight columns overwritten """ assert isinstance(vis, Visibility), "vis is not a Visibility: %r" % vis assert isinstance(vis.blockvis, BlockVisibility), "No blockvisibility in vis %r" % vis assert vis.cindex is not None, "No reverse index in Visibility %r" % vis log.debug('decoalesce_visibility: Created new Visibility for decoalesced data_models') decomp_vis = copy_visibility(vis.blockvis) vshape = decomp_vis.data['vis'].shape npol = vshape[-1] dvis = numpy.zeros(vshape, dtype='complex') assert numpy.max(vis.cindex) < dvis.size assert numpy.max(vis.cindex) < vis.vis.shape[0], "Incorrect template used in decoalescing" for i in range(dvis.size // npol): decomp_vis.data['vis'].flat[i:i + npol] = vis.data['vis'][vis.cindex[i]] decomp_vis.data['weight'].flat[i:i + npol] = vis.data['weight'][vis.cindex[i]] decomp_vis.data['imaging_weight'].flat[i:i + npol] = vis.data['imaging_weight'][vis.cindex[i]] log.debug('decoalesce_visibility: Coalesced %s, decoalesced %s' % (vis_summary(vis), vis_summary( decomp_vis))) return decomp_vis
[docs]def average_in_blocks(vis, uvw, wts, imaging_wts, times, integration_time, frequency, channel_bandwidth, time_coal=1.0, max_time_coal=100, frequency_coal=1.0, max_frequency_coal=100): """ Average visibility in blocks :param vis: :param uvw: :param wts: :param imaging_wts: :param times: :param integration_time: :param frequency: :param channel_bandwidth: :param time_coal: :param max_time_coal: :param frequency_coal: :param max_frequency_coal: :return: """ # Calculate the averaging factors for time and frequency making them the same for all times # for this baseline # Find the maximum possible baseline and then scale to this. # The input visibility is a block of shape [ntimes, nant, nant, nchan, npol]. We will map this # into rows like vis[npol] and with additional columns antenna1, antenna2, frequency ntimes, nant, _, nchan, npol = vis.shape times.dtype = numpy.float64 # Original # Pol independent weighting # allpwtsgrid = numpy.sum(wts, axis=4) # # Pol and frequency independent weighting # allcpwtsgrid = numpy.sum(allpwtsgrid, axis=3) # # Pol and time independent weighting # alltpwtsgrid = numpy.sum(allpwtsgrid, axis=0) # Optimized allpwtsgrid = numpy.einsum('ijklm->ijkl', wts, optimize=True) allcpwtsgrid = numpy.einsum('ijkl->ijk', allpwtsgrid, optimize=True) alltpwtsgrid = numpy.einsum('ijkl->jkl', allpwtsgrid, optimize=True) # Now calculate on a baseline basis the time and frequency averaging. We do this by looking at # the maximum uv distance for all data and for a given baseline. The integration time and # channel bandwidth are scale appropriately. time_average = numpy.ones([nant, nant], dtype='int') frequency_average = numpy.ones([nant, nant], dtype='int') ua = numpy.arange(nant) # Original # uvmax = numpy.sqrt(numpy.max(uvw[..., 0] ** 2 + uvw[..., 1] ** 2 + uvw[..., 2] ** 2)) # for a2 in ua: # for a1 in ua: # if allpwtsgrid[:, a2, a1, :].any() > 0.0: # uvdist = numpy.max(numpy.sqrt(uvw[:, a2, a1, 0] ** 2 + uvw[:, a2, a1, 1] ** 2), axis=0) # if uvdist > 0.0: # time_average[a2, a1] = min(max_time_coal, # max(1, int(round((time_coal * uvmax / uvdist))))) # frequency_average[a2, a1] = min(max_frequency_coal, # max(1, int(round(frequency_coal * uvmax / uvdist)))) # else: # time_average[a2, a1] = max_time_coal # frequency_average[a2, a1] = max_frequency_coal # Optimized # Calculate uvdist instead of uvwdist uvwd = uvw[..., 0:2] uvdist = numpy.einsum('ijkm,ijkm->ijk', uvwd, uvwd, optimize=True) uvmax = numpy.sqrt(numpy.max(uvdist)) # uvdist = numpy.sqrt(numpy.einsum('ijkm,ijkm->ijk', uvw, uvw, optimize=True)) uvdist_max = numpy.sqrt(numpy.max(uvdist, axis=0)) allpwtsgrid_bool = numpy.einsum('ijklm->jk', wts, optimize=True) mask = numpy.where(uvdist_max > 0.) mask0 = numpy.where(uvdist_max <= 0.) time_average[mask] = numpy.round((time_coal * uvmax / uvdist_max[mask])) time_average.dtype = numpy.int64 time_average[mask0] = max_time_coal numpy.putmask(time_average, allpwtsgrid_bool == 0, 0) numpy.putmask(time_average, time_average < 1, 1) numpy.putmask(time_average, time_average > max_time_coal, max_time_coal) frequency_average[mask] = numpy.round((frequency_coal * uvmax / uvdist_max[mask])) frequency_average.dtype = numpy.int64 frequency_average[mask0] = max_frequency_coal numpy.putmask(frequency_average, allpwtsgrid_bool == 0, 0) numpy.putmask(frequency_average, frequency_average < 1, 1) numpy.putmask(frequency_average, frequency_average > max_frequency_coal, max_frequency_coal) # See how many time chunks and frequency we need for each baseline. To do this we use the same averaging that # we will use later for the actual data_models. This tells us the number of chunks required for each baseline. frequency_grid, time_grid = numpy.meshgrid(frequency, times) channel_bandwidth_grid, integration_time_grid = numpy.meshgrid(channel_bandwidth, integration_time) cnvis = 0 time_chunk_len = numpy.ones([nant, nant], dtype='int') frequency_chunk_len = numpy.ones([nant, nant], dtype='int') for a2 in ua: for a1 in ua: if (time_average[a2, a1] > 0) & (frequency_average[a2, a1] > 0 & (allpwtsgrid[:, a2, a1, ...].any() > 0.0)): time_chunks, _ = average_chunks(times, allcpwtsgrid[:, a2, a1], time_average[a2, a1]) time_chunk_len[a2, a1] = time_chunks.shape[0] frequency_chunks, _ = average_chunks(frequency, alltpwtsgrid[a2, a1, :], frequency_average[a2, a1]) frequency_chunk_len[a2, a1] = frequency_chunks.shape[0] nrows = time_chunk_len[a2, a1] * frequency_chunk_len[a2, a1] cnvis += nrows # Now we know enough to define the output coalesced arrays. The output will be # successive a1, a2: [len_time_chunks[a2,a1], a2, a1, len_frequency_chunks[a2,a1]] ctime = numpy.zeros([cnvis]) cfrequency = numpy.zeros([cnvis]) cchannel_bandwidth = numpy.zeros([cnvis]) cvis = numpy.zeros([cnvis, npol], dtype='complex') cwts = numpy.zeros([cnvis, npol]) cimwts = numpy.zeros([cnvis, npol]) cuvw = numpy.zeros([cnvis, 3]) ca1 = numpy.zeros([cnvis], dtype='int') ca2 = numpy.zeros([cnvis], dtype='int') cintegration_time = numpy.zeros([cnvis]) # For decoalescence we keep an index to map back to the original BlockVisibility rowgrid = numpy.zeros([ntimes, nant, nant, nchan], dtype='int') rowgrid.flat = range(rowgrid.size) cindex = numpy.zeros([rowgrid.size], dtype='int') # Now go through, chunking up the various arrays. Everything is converted into an array with # axes [time, channel] and then it is averaged over time and frequency chunks for # this baseline. # To aid decoalescence we will need an index of which output elements a given input element # contributes to. This is a many to one. The decoalescence will then just consist of using # this index to extract the coalesced value that a given input element contributes towards. visstart = 0 for a2 in ua: for a1 in ua: nrows = time_chunk_len[a2, a1] * frequency_chunk_len[a2, a1] rows = slice(visstart, visstart + nrows) cindex.flat[rowgrid[:, a2, a1, :]] = numpy.array(range(visstart, visstart + nrows)) ca1[rows] = a1 ca2[rows] = a2 # Average over time and frequency for case where polarisation isn't an issue def average_from_grid(arr): return average_chunks2(arr, allpwtsgrid[:, a2, a1, :], (time_average[a2, a1], frequency_average[a2, a1]))[0] ctime[rows] = average_from_grid(time_grid).flatten() cfrequency[rows] = average_from_grid(frequency_grid).flatten() for axis in range(3): uvwgrid = numpy.outer(uvw[:, a2, a1, axis], frequency / constants.c.value) cuvw[rows, axis] = average_from_grid(uvwgrid).flatten() # For some variables, we need the sum not the average def sum_from_grid(arr): result = average_chunks2(arr, allpwtsgrid[:, a2, a1, :], (time_average[a2, a1], frequency_average[a2, a1])) return result[0] * result[0].size cintegration_time[rows] = sum_from_grid(integration_time_grid).flatten() cchannel_bandwidth[rows] = sum_from_grid(channel_bandwidth_grid).flatten() # For the polarisations we have to perform the time-frequency average separately for each polarisation for pol in range(npol): result = average_chunks2(vis[:, a2, a1, :, pol], wts[:, a2, a1, :, pol], (time_average[a2, a1], frequency_average[a2, a1])) cvis[rows, pol], cwts[rows, pol] = result[0].flatten(), result[1].flatten() # Now do the imaging weights for pol in range(npol): result = average_chunks2(imaging_wts[:, a2, a1, :, pol], wts[:, a2, a1, :, pol], (time_average[a2, a1], frequency_average[a2, a1])) cimwts[rows, pol] = result[0].flatten() visstart += nrows assert cnvis == visstart, "Mismatch between number of rows in coalesced visibility %d and index %d" % \ (cnvis, visstart) return cvis, cuvw, cwts, cimwts, ctime, cfrequency, cchannel_bandwidth, ca1, ca2, cintegration_time, cindex
[docs]def convert_blocks(vis, uvw, wts, imaging_wts, times, integration_time, frequency, channel_bandwidth): """ Convert with no averaging :param vis: :param uvw: :param wts: :param imaging_wts: :param times: :param integration_time: :param frequency: :param channel_bandwidth: :return: """ # The input visibility is a block of shape [ntimes, nant, nant, nchan, npol]. We will map this # into rows like vis[npol] and with additional columns antenna1, antenna2, frequency ntimes, nant, _, nchan, npol = vis.shape assert nchan == len(frequency) cnvis = ntimes * nant * (nant - 1) * nchan // 2 # Now we know enough to define the output coalesced arrays. The shape will be # succesive a1, a2: [len_time_chunks[a2,a1], a2, a1, len_frequency_chunks[a2,a1]] # ctime1 = numpy.zeros([cnvis]) # cfrequency1 = numpy.zeros([cnvis]) # cchannel_bandwidth1 = numpy.zeros([cnvis]) # cvis1 = numpy.zeros([cnvis, npol], dtype='complex') # cwts1 = numpy.zeros([cnvis, npol]) # cimaging_weights1 = numpy.ones([cnvis, npol]) # cuvw1 = numpy.zeros([cnvis, 3]) # cintegration_time1 = numpy.zeros([cnvis]) ca1 = numpy.zeros([cnvis], dtype='int') ca2 = numpy.zeros([cnvis], dtype='int') # ctime = numpy.zeros([cnvis]) # cfrequency = numpy.zeros([cnvis]) # cchannel_bandwidth = numpy.zeros([cnvis]) # cvis = numpy.zeros([cnvis, npol], dtype='complex') # cwts = numpy.zeros([cnvis, npol]) # cimaging_weights = numpy.ones([cnvis, npol]) # cuvw = numpy.zeros([cnvis, 3]) # cintegration_time = numpy.zeros([cnvis]) # For decoalescence we keep an index to map back to the original BlockVisibility rowgrid = numpy.zeros([ntimes, nant, nant, nchan], dtype='int') rowgrid.flat = range(rowgrid.size) cindex = numpy.zeros([rowgrid.size], dtype='int') mask_rowgrid = numpy.zeros_like(rowgrid, dtype='bool') mask_uvw = numpy.zeros_like(uvw, dtype='bool') mask_vis = numpy.zeros_like(vis, dtype='bool') mask_wts = numpy.zeros_like(wts, dtype='bool') mask_imaging_wts = numpy.zeros_like(imaging_wts, dtype='bool') # Now go through, chunking up the various arrays. Everything is converted into an array with # axes [time, channel] and then it is averaged over time and frequency chunks for # this baseline. # To aid decoalescence we will need an index of which output elements a given input element # contributes to. This is a many to one. The decoalescence will then just consist of using # this index to extract the coalesced value that a given input element contributes towards. # row = 0 # for itime in range(ntimes): # for a1 in range(nant): # for a2 in range(a1 + 1, nant): # for chan in range(nchan): # ca1[row] = a1 # ca2[row] = a2 # cfrequency[row] = frequency[chan] # ctime[row] = times[itime] # # cuvw[row, :] = uvw[itime, a2, a1, :] * frequency[chan] / constants.c.value # # cindex.flat[rowgrid[itime, a2, a1, chan]] = row # cintegration_time[row] = integration_time[itime] # cchannel_bandwidth[row] = channel_bandwidth[chan] # cvis[row, :] = vis[itime, a2, a1, chan, :] # cwts[row, :] = wts[itime, a2, a1, chan, :] # cimaging_weights[row, :] = imaging_wts[itime, a2, a1, chan, :] # row += 1 for a1 in range(nant): for a2 in range(a1 + 1, nant): mask_uvw[:, a2, a1, :] = True mask_vis[:, a2, a1, :, :] = True mask_wts[:, a2, a1, :, :] = True mask_imaging_wts[:, a2, a1, :, :] = True mask_rowgrid[:, a2, a1, :] = True rowgrid_mask = numpy.argwhere(mask_rowgrid == True) # uvw_mask.flat = range(len(uvw_mask)) ca2 = rowgrid_mask[:, 1] ca1 = rowgrid_mask[:, 2] # Recalcute the position cindex.flat[rowgrid[rowgrid_mask[:, 0], rowgrid_mask[:, 1], rowgrid_mask[:, 2], rowgrid_mask[:, 3]]] = range(cnvis) cfrequency = numpy.tile(frequency, ntimes * nant * (nant - 1) // 2) cchannel_bandwidth = numpy.tile(channel_bandwidth, ntimes * nant * (nant - 1) // 2) ctime = numpy.repeat(times, nchan * nant * (nant - 1) // 2) cintegration_time = numpy.repeat(integration_time, nchan * nant * (nant - 1) // 2) cuvw = (numpy.tile(uvw[mask_uvw].reshape(-1, 3), nchan)).reshape(-1, 3) freq = numpy.repeat(cfrequency, 3).reshape(-1, 3) cuvw[..., :] *= freq[:] / constants.c.value cvis = vis[mask_vis].reshape(-1, npol) cwts = wts[mask_wts].reshape(-1, npol) cimaging_weights = imaging_wts[mask_imaging_wts].reshape(-1, npol) return cvis, cuvw, cwts, cimaging_weights, ctime, cfrequency, cchannel_bandwidth, ca1, ca2, \ cintegration_time, cindex
[docs]def convert_visibility_to_blockvisibility(vis: Visibility) -> BlockVisibility: """ Convert a Visibility to equivalent BlockVisibility format :param vis: Coalesced visibility :return: Visibility """ if isinstance(vis, BlockVisibility): return vis else: return decoalesce_visibility(vis)