""" Functions for calibration, including creation of gaintables, application of gaintables, and
merging gaintables.
"""
import copy
import logging
import numpy.linalg
from data_models.memory_data_models import GainTable, BlockVisibility, QA, assert_vis_gt_compatible
from data_models.memory_data_models import ReceptorFrame
from ..visibility.iterators import vis_timeslice_iter
log = logging.getLogger(__name__)
[docs]def gaintable_summary(gt: GainTable):
"""Return string summarizing the Gaintable
"""
return "%s rows, %.3f GB" % (gt.data.shape, gt.size())
[docs]def create_gaintable_from_blockvisibility(vis: BlockVisibility, timeslice=None,
frequencyslice: float = None, **kwargs) -> GainTable:
""" Create gain table from visibility.
This makes an empty gain table consistent with the BlockVisibility.
:param vis: BlockVisibilty
:param timeslice: Time interval between solutions (s)
:param frequency_width: Frequency solution width (Hz)
:return: GainTable
"""
assert isinstance(vis, BlockVisibility), "vis is not a BlockVisibility: %r" % vis
nants = vis.nants
if timeslice is None or timeslice == 'auto':
timeslice = numpy.min(vis.integration_time)
utimes = timeslice * numpy.unique(numpy.round((vis.time - vis.time[0]) / timeslice))
ntimes = len(utimes)
gain_interval = timeslice * numpy.ones([ntimes])
# log.debug('create_gaintable_from_blockvisibility: times are %s' % str(utimes))
# log.debug('create_gaintable_from_blockvisibility: intervals are %s' % str(gain_interval))
ntimes = len(utimes)
ufrequency = numpy.unique(vis.frequency)
nfrequency = len(ufrequency)
receptor_frame = ReceptorFrame(vis.polarisation_frame.type)
nrec = receptor_frame.nrec
gainshape = [ntimes, nants, nfrequency, nrec, nrec]
gain = numpy.ones(gainshape, dtype='complex')
if nrec > 1:
gain[..., 0, 1] = 0.0
gain[..., 1, 0] = 0.0
gain_weight = numpy.ones(gainshape)
gain_time = utimes + vis.time[0]
gain_frequency = ufrequency
gain_residual = numpy.zeros([ntimes, nfrequency, nrec, nrec])
gt = GainTable(gain=gain, time=gain_time, interval=gain_interval, weight=gain_weight, residual=gain_residual,
frequency=gain_frequency, receptor_frame=receptor_frame, phasecentre=vis.phasecentre,
configuration=vis.configuration)
assert isinstance(gt, GainTable), "gt is not a GainTable: %r" % gt
assert_vis_gt_compatible(vis, gt)
return gt
[docs]def apply_gaintable(vis: BlockVisibility, gt: GainTable, inverse=False, vis_slices=None, **kwargs) -> BlockVisibility:
"""Apply a gain table to a block visibility
The corrected visibility is::
V_corrected = {g_i * g_j^*}^-1 V_obs
If the visibility data are polarised e.g. polarisation_frame("linear") then the inverse operator
represents an actual inverse of the gains.
:param vis: Visibility to have gains applied
:param gt: Gaintable to be applied
:param inverse: Apply the inverse (default=False)
:return: input vis with gains applied
"""
assert isinstance(vis, BlockVisibility), "vis is not a BlockVisibility: %r" % vis
assert isinstance(gt, GainTable), "gt is not a GainTable: %r" % gt
assert_vis_gt_compatible(vis, gt)
if inverse:
log.debug('apply_gaintable: Apply inverse gaintable')
else:
log.debug('apply_gaintable: Apply gaintable')
is_scalar = gt.gain.shape[-2:] == (1, 1)
if is_scalar:
log.debug('apply_gaintable: scalar gains')
for chunk, rows in enumerate(vis_timeslice_iter(vis, vis_slices=vis_slices)):
if numpy.sum(rows) > 0:
vistime = numpy.average(vis.time[rows])
gaintable_rows = abs(gt.time - vistime) < gt.interval / 2.0
# Lookup the gain for this set of visibilities
gain = gt.data['gain'][gaintable_rows]
gainwt = gt.data['weight'][gaintable_rows]
# The shape of the mueller matrix is
ntimes, nant, nchan, nrec, _ = gain.shape
original = vis.vis[rows]
originalwt = vis.weight[rows]
applied = copy.deepcopy(original)
appliedwt = copy.deepcopy(originalwt)
for time in range(ntimes):
antantwt = numpy.outer(gainwt, gainwt)
if is_scalar:
if inverse:
lgain = numpy.ones_like(gain)
lgain[numpy.abs(gain) > 0.0] = 1.0 / gain[numpy.abs(gain) > 0.0]
else:
lgain = gain
clgain = numpy.conjugate(lgain)
for chan in range(nchan):
smueller = numpy.ma.outer(lgain[time, :, chan, 0],
clgain[time, :, chan, 0]).reshape([nant, nant])
applied[time, :, :, chan, 0] = original[time, :, :, chan, 0] * smueller
antantwt = numpy.outer(gainwt[time, :, chan, 0, 0], gainwt[time, :, chan, 0, 0])
applied[time, :, :, chan, 0][antantwt == 0.0] = 0.0
appliedwt[time, :, :, chan, 0][antantwt == 0.0] = 0.0
else:
for a1 in range(vis.nants - 1):
for a2 in range(a1 + 1, vis.nants):
for chan in range(nchan):
mueller = numpy.kron(gain[time, a1, chan, :, :],
numpy.conjugate(gain[time, a2, chan, :, :]))
if inverse:
# If the Mueller is singular, ignore it
try:
mueller = numpy.linalg.inv(mueller)
applied[time, a2, a1, chan, :] = \
numpy.matmul(mueller, original[time, a2, a1, chan, :])
except numpy.linalg.linalg.LinAlgError:
applied[time, a2, a1, chan, :] = \
original[time, a2, a1, chan, :]
else:
applied[time, a2, a1, chan, :] = \
numpy.matmul(mueller, original[time, a2, a1, chan, :])
if (gainwt[time, a1, chan, 0, 0] <= 0.0) or (gainwt[time, a1, chan, 0, 0] <= 0.0):
applied[time, a2, a1, chan, 0] = 0.0
appliedwt[time, a2, a1, chan, 0] = 0.0
vis.data['vis'][rows] = applied
return vis
[docs]def append_gaintable(gt: GainTable, othergt: GainTable) -> GainTable:
"""Append othergt to gt
:param gt:
:param othergt:
:return: GainTable gt + othergt
"""
assert gt.receptor_frame == othergt.receptor_frame
gt.data = numpy.hstack((gt.data, othergt.data))
return gt
[docs]def copy_gaintable(gt: GainTable, zero=False):
"""Copy a GainTable
Performs a deepcopy of the data array
"""
if gt is None:
return gt
assert isinstance(gt, GainTable), gt
newgt = copy.copy(gt)
newgt.data = copy.deepcopy(gt.data)
if zero:
newgt.data['gt'][...] = 0.0
return newgt
[docs]def create_gaintable_from_rows(gt: GainTable, rows: numpy.ndarray, makecopy=True) -> GainTable:
""" Create a GainTable from selected rows
:param gt: GainTable
:param rows: Boolean array of row selection
:param makecopy: Make a deep copy (True)
:return: GainTable
"""
if rows is None or numpy.sum(rows) == 0:
return None
assert len(rows) == gt.ntimes, "Length of rows does not agree with length of GainTable"
assert isinstance(gt, GainTable), gt
if makecopy:
newgt = copy_gaintable(gt)
newgt.data = copy.deepcopy(gt.data[rows])
return newgt
else:
gt.data = copy.deepcopy(gt.data[rows])
return gt
[docs]def qa_gaintable(gt: GainTable, context=None) -> QA:
"""Assess the quality of a gaintable
:param gt:
:return: AQ
"""
agt = numpy.abs(gt.gain[gt.weight > 0.0])
pgt = numpy.angle(gt.gain[gt.weight > 0.0])
rgt = gt.residual[numpy.sum(gt.weight, axis=1) > 0.0]
data = {'shape': gt.gain.shape,
'maxabs-amp': numpy.max(agt),
'minabs-amp': numpy.min(agt),
'rms-amp': numpy.std(agt),
'medianabs-amp': numpy.median(agt),
'maxabs-phase': numpy.max(pgt),
'minabs-phase': numpy.min(pgt),
'rms-phase': numpy.std(pgt),
'medianabs-phase': numpy.median(pgt),
'residual': numpy.max(rgt)
}
return QA(origin='qa_gaintable', data=data, context=context)
[docs]def gaintable_plot(gt: GainTable, ax, title='', value='amp', ants=None, channels=None,
label_max=10, **kwargs):
""" Standard plot of gain table
:param gt: Gaintable
:param ax: matplotlib axes
:param value: 'amp' or 'phase' or 'residual'
:param ants: Antennas to plot
:param channels: Channels to plot
:param kwargs:
:return:
"""
if ants is None:
ants = range(gt.nants)
if channels is None:
channels = range(gt.nchan)
if value == "residual":
residual = gt.residual[:, channels, 0, 0]
ax.plot(gt.time, residual, '.')
else:
for ant in ants:
if gt.configuration is not None:
label = gt.configuration.names[ant]
else:
label = ''
amp = numpy.abs(gt.gain[:, ant, channels, 0, 0])
if value == 'amp':
ax.plot(gt.time, amp, '.', label=label)
else:
angle = numpy.angle(gt.gain[:, ant, channels, 0, 0])
ax.plot(gt.time, angle, '.', label=label)
if gt.configuration is not None:
if len(gt.configuration.names) < label_max:
ax.legend()
ax.set_title(title)
ax.set_xlabel('Time (s)')
ax.set_ylabel(value)