Source code for processing_library.calibration.solvers

""" Functions to solve for antenna/station gain

This uses an iterative substitution algorithm due to Larry D'Addario c 1980'ish. Used
in the original VLA Dec-10 Antsol.


For example::

    gtsol = solve_gaintable(vis, originalvis, phase_only=True, niter=niter, crosspol=False, tol=1e-6)
    vis = apply_gaintable(vis, gtsol, inverse=True)
 

"""

__all__ = ['solution_residual_matrix', 'solution_residual_scalar', 'solution_residual_vector',
           'solve_antenna_gains_itsubs_matrix', 'solve_antenna_gains_itsubs_scalar',
           'solve_antenna_gains_itsubs_vector',
           'gain_substitution_matrix', 'gain_substitution_vector', 'gain_substitution_vector',
           'gain_substitution_scalar', 'solve_from_X']

import logging

import numpy

from data_models.memory_data_models import GainTable

log = logging.getLogger(__name__)


[docs]def solve_from_X(gt: GainTable, x: numpy.ndarray, xwt: numpy.ndarray, chunk, crosspol, niter, phase_only, tol, npol) \ -> GainTable: """ Solve for gains from the point source equivalents :param gt: :param x: point source visibility :param xwt: point source weight :param chunk: which chunk of the gaintable? :param crosspol: :param niter: :param phase_only: :param tol: :param npol: :return: """ if npol > 1: if crosspol: gt.data['gain'][chunk, ...], gt.data['weight'][chunk, ...], gt.data['residual'][chunk, ...] = \ solve_antenna_gains_itsubs_matrix(gt.data['gain'][chunk, ...], gt.data['weight'][chunk, ...], x, xwt, phase_only=phase_only, niter=niter, tol=tol) else: gt.data['gain'][chunk, ...], gt.data['weight'][chunk, ...], gt.data['residual'][chunk, ...] = \ solve_antenna_gains_itsubs_vector(gt.data['gain'][chunk, ...], gt.data['weight'][chunk, ...], x, xwt, phase_only=phase_only, niter=niter, tol=tol) else: gt.data['gain'][chunk, ...], gt.data['weight'][chunk, ...], gt.data['residual'][chunk, ...] = \ solve_antenna_gains_itsubs_scalar(gt.data['gain'][chunk, ...], gt.data['weight'][chunk, ...], x, xwt, phase_only=phase_only, niter=niter, tol=tol) return gt
[docs]def solve_antenna_gains_itsubs_scalar(gain, gwt, x, xwt, niter=30, tol=1e-8, phase_only=True, refant=0, damping=0.5): """Solve for the antenna gains x(antenna2, antenna1) = gain(antenna1) conj(gain(antenna2)) This uses an iterative substitution algorithm due to Larry D'Addario c 1980'ish (see ThompsonDaddario1982 Appendix 1). Used in the original VLA Dec-10 Antsol. :param gain: gains :param gwt: gain weight :param x: Equivalent point source visibility[nants, nants, ...] :param xwt: Equivalent point source weight [nants, nants, ...] :param niter: Number of iterations :param tol: tolerance on solution change :param phase_only: Do solution for only the phase? (default True) :param refant: Reference antenna for phase (default=0) :return: gain [nants, ...], weight [nants, ...] """ nants = x.shape[0] for ant1 in range(nants): x[ant1, ant1, ...] = 0.0 xwt[ant1, ant1, ...] = 0.0 for ant2 in range(ant1 + 1, nants): x[ant1, ant2, ...] = numpy.conjugate(x[ant2, ant1, ...]) xwt[ant1, ant2, ...] = xwt[ant2, ant1, ...] for iter in range(niter): gainLast = gain gain, gwt = gain_substitution_scalar(gain, x, xwt) if phase_only: mask = numpy.abs(gain) > 0.0 gain[mask] = gain[mask] / numpy.abs(gain[mask]) angles = numpy.angle(gain) gain *= numpy.exp(-1j * angles)[refant, ...] gain = (1.0 - damping) * gain + damping * gainLast change = numpy.max(numpy.abs(gain - gainLast)) if change < tol: return gain, gwt, solution_residual_scalar(gain, x, xwt) return gain, gwt, solution_residual_scalar(gain, x, xwt)
[docs]def gain_substitution_scalar(gain, x, xwt): nants, nchan, nrec, _ = gain.shape newgain = numpy.ones_like(gain, dtype='complex') gwt = numpy.zeros_like(gain, dtype='float') x = x.reshape([nants, nants, nchan, nrec, nrec]) xwt = xwt.reshape([nants, nants, nchan, nrec, nrec]) xxwt = x[:, :, :, 0, 0] * xwt[:, :, :, 0, 0] cgain = numpy.conjugate(gain) gcg = gain[:, :, 0, 0] * cgain[:, :, 0, 0] for ant1 in range(nants): top = numpy.sum(gain[:, :, 0, 0] * xxwt[:, ant1, :], axis=0) bot = numpy.sum((gcg[:, :] * xwt[:, ant1, :, 0, 0]).real, axis=0) if bot.all() > 0.0: newgain[ant1, :, 0, 0] = top / bot gwt[ant1, :, 0, 0] = bot else: newgain[ant1, :, 0, 0] = 0.0 gwt[ant1, :, 0, 0] = 0.0 return newgain, gwt
[docs]def solve_antenna_gains_itsubs_vector(gain, gwt, x, xwt, niter=30, tol=1e-8, phase_only=True, refant=0): """Solve for the antenna gains using full matrix expressions x(antenna2, antenna1) = gain(antenna1) conj(gain(antenna2)) See Appendix D, section D.1 in: J. P. Hamaker, “Understanding radio polarimetry - IV. The full-coherency analogue of scalar self-calibration: Self-alignment, dynamic range and polarimetric fidelity,” Astronomy and Astrophysics Supplement Series, vol. 143, no. 3, pp. 515–534, May 2000. :param gain: gains :param gwt: gain weight :param x: Equivalent point source visibility[nants, nants, ...] :param xwt: Equivalent point source weight [nants, nants, ...] :param niter: Number of iterations :param tol: tolerance on solution change :param phase_only: Do solution for only the phase? (default True) :param refant: Reference antenna for phase (default=0.0) :return: gain [nants, ...], weight [nants, ...] """ nants, _, nchan, npol = x.shape assert npol == 4 newshape = (nants, nants, nchan, 2, 2) x = x.reshape(newshape) xwt = xwt.reshape(newshape) for ant1 in range(nants): x[ant1, ant1, ...] = 0.0 xwt[ant1, ant1, ...] = 0.0 for ant2 in range(ant1 + 1, nants): x[ant1, ant2, ...] = numpy.conjugate(x[ant2, ant1, ...]) xwt[ant1, ant2, ...] = xwt[ant2, ant1, ...] gain[..., 0, 1] = 0.0 gain[..., 1, 0] = 0.0 for iter in range(niter): gainLast = gain gain, gwt = gain_substitution_vector(gain, x, xwt) for rec in [0, 1]: gain[..., rec, 1 - rec] = 0.0 if phase_only: gain[..., rec, rec] = gain[..., rec, rec] / numpy.abs(gain[..., rec, rec]) gain[..., rec, rec] *= numpy.conjugate(gain[refant, ..., rec, rec]) / numpy.abs(gain[refant, ..., rec, rec]) change = numpy.max(numpy.abs(gain - gainLast)) gain = 0.5 * (gain + gainLast) if change < tol: return gain, gwt, solution_residual_vector(gain, x, xwt) return gain, gwt, solution_residual_vector(gain, x, xwt)
[docs]def gain_substitution_vector(gain, x, xwt): nants, nchan, nrec, _ = gain.shape newgain = numpy.ones_like(gain, dtype='complex') if nrec > 0: newgain[..., 0, 1] = 0.0 newgain[..., 1, 0] = 0.0 gwt = numpy.zeros_like(gain, dtype='float') # We are going to work with Jones 2x2 matrix formalism so everything has to be # converted to that format x = x.reshape(nants, nants, nchan, nrec, nrec) xwt = xwt.reshape(nants, nants, nchan, nrec, nrec) if nrec > 0: gain[..., 0, 1] = 0.0 gain[..., 1, 0] = 0.0 for ant1 in range(nants): for chan in range(nchan): # Loop over e.g. 'RR', 'LL, or 'xx', 'YY' ignoring cross terms for rec in range(nrec): top = numpy.sum(x[:, ant1, chan, rec, rec] * gain[:, chan, rec, rec] * xwt[:, ant1, chan, rec, rec], axis=0) bot = numpy.sum((gain[:, chan, rec, rec] * numpy.conjugate(gain[:, chan, rec, rec]) * xwt[:, ant1, chan, rec, rec]).real, axis=0) if bot > 0.0: newgain[ant1, chan, rec, rec] = top / bot gwt[ant1, chan, rec, rec] = bot else: newgain[ant1, chan, rec, rec] = 0.0 gwt[ant1, chan, rec, rec] = 0.0 return newgain, gwt
[docs]def solve_antenna_gains_itsubs_matrix(gain, gwt, x, xwt, niter=30, tol=1e-8, phase_only=True, refant=0): """Solve for the antenna gains using full matrix expressions x(antenna2, antenna1) = gain(antenna1) conj(gain(antenna2)) See Appendix D, section D.1 in: J. P. Hamaker, “Understanding radio polarimetry - IV. The full-coherency analogue of scalar self-calibration: Self-alignment, dynamic range and polarimetric fidelity,” Astronomy and Astrophysics Supplement Series, vol. 143, no. 3, pp. 515–534, May 2000. :param gain: gains :param gwt: gain weight :param x: Equivalent point source visibility[nants, nants, ...] :param xwt: Equivalent point source weight [nants, nants, ...] :param niter: Number of iterations :param tol: tolerance on solution change :param phase_only: Do solution for only the phase? (default True) :param refant: Reference antenna for phase (default=0.0) :return: gain [nants, ...], weight [nants, ...] """ nants, _, nchan, npol = x.shape assert npol == 4 newshape = (nants, nants, nchan, 2, 2) x = x.reshape(newshape) xwt = xwt.reshape(newshape) for ant1 in range(nants): x[ant1, ant1, ...] = 0.0 xwt[ant1, ant1, ...] = 0.0 for ant2 in range(ant1 + 1, nants): x[ant1, ant2, ...] = numpy.conjugate(x[ant2, ant1, ...]) xwt[ant1, ant2, ...] = xwt[ant2, ant1, ...] gain[..., 0, 1] = 0.0 gain[..., 1, 0] = 0.0 for iter in range(niter): gainLast = gain gain, gwt = gain_substitution_matrix(gain, x, xwt) if phase_only: gain = gain / numpy.abs(gain) change = numpy.max(numpy.abs(gain - gainLast)) gain = 0.5 * (gain + gainLast) if change < tol: return gain, gwt, solution_residual_matrix(gain, x, xwt) return gain, gwt, solution_residual_matrix(gain, x, xwt)
[docs]def gain_substitution_matrix(gain, x, xwt): nants, nchan, nrec, _ = gain.shape newgain = numpy.ones_like(gain, dtype='complex') gwt = numpy.zeros_like(gain, dtype='float') # We are going to work with Jones 2x2 matrix formalism so everything has to be # converted to that format x = x.reshape([nants, nants, nchan, nrec, nrec]) xwt = xwt.reshape([nants, nants, nchan, nrec, nrec]) # Write these loops out explicitly. Derivation of these vector equations is tedious but they are # structurally identical to the scalar case with the following changes # Vis -> 2x2 coherency vector, g-> 2x2 Jones matrix, *-> matmul, conjugate->Hermitean transpose (.H) for ant1 in range(nants): for chan in range(nchan): top = 0.0 bot = 0.0 for ant2 in range(nants): if ant1 != ant2: xmat = x[ant2, ant1, chan] xwtmat = xwt[ant2, ant1, chan] g2 = gain[ant2, chan] top += xmat * xwtmat * g2 bot += numpy.conjugate(g2) * xwtmat * g2 newgain[ant1, chan][bot > 0.0] = top[bot > 0.0] / bot[bot > 0.0] newgain[ant1, chan][bot <= 0.0] = 0.0 gwt[ant1, chan] = bot.real return newgain, gwt
[docs]def solution_residual_scalar(gain, x, xwt): """Calculate residual across all baselines of gain for point source equivalent visibilities :param gain: gain [nant, ...] :param x: Point source equivalent visibility [nant, ...] :param xwt: Point source equivalent weight [nant, ...] :return: residual[...] """ nant, nchan, nrec, _ = gain.shape x = x.reshape(nant, nant, nchan, nrec, nrec) xwt = xwt.reshape(nant, nant, nchan, nrec, nrec) residual = numpy.zeros([nchan, nrec, nrec]) sumwt = numpy.zeros([nchan, nrec, nrec]) for chan in range(nchan): lgain = gain[:, chan, 0, 0] clgain = numpy.conjugate(lgain) smueller = numpy.ma.outer(clgain, lgain).reshape([nant, nant]) error = x[:, :, chan, 0, 0] - smueller for i in range(nant): error[i, i] = 0.0 residual += numpy.sum(error * xwt[:, :, chan, 0, 0] * numpy.conjugate(error)).real sumwt += numpy.sum(xwt[:, :, chan, 0, 0]) residual[sumwt > 0.0] = numpy.sqrt(residual[sumwt > 0.0] / sumwt[sumwt > 0.0]) residual[sumwt <= 0.0] = 0.0 return residual
[docs]def solution_residual_vector(gain, x, xwt): """Calculate residual across all baselines of gain for point source equivalent visibilities Vector case i.e. off-diagonals of gains are zero :param gain: gain [nant, ...] :param x: Point source equivalent visibility [nant, ...] :param xwt: Point source equivalent weight [nant, ...] :return: residual[...] """ nants, nchan, nrec, _ = gain.shape x = x.reshape(nants, nants, nchan, nrec, nrec) x[..., 1, 0] = 0.0 x[..., 0, 1] = 0.0 xwt = xwt.reshape(nants, nants, nchan, nrec, nrec) xwt[..., 1, 0] = 0.0 xwt[..., 0, 1] = 0.0 residual = numpy.zeros([nchan, nrec, nrec]) sumwt = numpy.zeros([nchan, nrec, nrec]) for ant1 in range(nants): for ant2 in range(nants): for chan in range(nchan): for rec in range(nrec): error = x[ant2, ant1, chan, rec, rec] - \ gain[ant1, chan, rec, rec] * numpy.conjugate(gain[ant2, chan, rec, rec]) residual += (error * xwt[ant2, ant1, chan, rec, rec] * numpy.conjugate(error)).real sumwt += xwt[ant2, ant1, chan, rec, rec] residual[sumwt > 0.0] = numpy.sqrt(residual[sumwt > 0.0] / sumwt[sumwt > 0.0]) residual[sumwt <= 0.0] = 0.0 return residual
[docs]def solution_residual_matrix(gain, x, xwt): """Calculate residual across all baselines of gain for point source equivalent visibilities :param gain: gain [nant, ...] :param x: Point source equivalent visibility [nant, ...] :param xwt: Point source equivalent weight [nant, ...] :return: residual[...] """ nants, _, nchan, nrec, _ = x.shape residual = numpy.zeros([nchan, nrec, nrec]) sumwt = numpy.zeros([nchan, nrec, nrec]) # This is written out in long winded form but should be optimised for # production code! for ant1 in range(nants): for ant2 in range(nants): for chan in range(nchan): for rec1 in range(nrec): for rec2 in range(nrec): error = x[ant2, ant1, chan, rec2, rec1] - \ gain[ant1, chan, rec2, rec1] * numpy.conjugate(gain[ant2, chan, rec2, rec1]) residual[chan, rec2, rec1] += (error * xwt[ant2, ant1, chan, rec2, rec1] * numpy.conjugate( error)).real sumwt[chan, rec2, rec1] += xwt[ant2, ant1, chan, rec2, rec1] residual[sumwt > 0.0] = numpy.sqrt(residual[sumwt > 0.0] / sumwt[sumwt > 0.0]) residual[sumwt <= 0.0] = 0.0 return residual