"""
Base simple visibility operations, placed here to avoid circular dependencies
"""
import os
import copy
import logging
from typing import Union
import numpy
import re
from astropy import constants as constants
from astropy import units as u
from astropy.coordinates import SkyCoord
from astropy.io import fits
from astropy.time import Time
from data_models.memory_data_models import Visibility, BlockVisibility, Configuration
from data_models.polarisation import PolarisationFrame, ReceptorFrame, correlate_polarisation
from processing_library.util.coordinate_support import xyz_to_uvw, uvw_to_xyz, skycoord_to_lmn, simulate_point, \
hadec_to_azel
log = logging.getLogger(__name__)
[docs]def vis_summary(vis: Union[Visibility, BlockVisibility]):
"""Return string summarizing the Visibility
"""
return "%d rows, %.3f GB" % (vis.nvis, vis.size())
[docs]def copy_visibility(vis: Union[Visibility, BlockVisibility], zero=False) -> Union[Visibility, BlockVisibility]:
"""Copy a visibility
Performs a deepcopy of the data array
"""
assert isinstance(vis, Visibility) or isinstance(vis, BlockVisibility), vis
newvis = copy.copy(vis)
newvis.data = numpy.copy(vis.data)
if isinstance(vis, Visibility):
newvis.cindex = vis.cindex
newvis.blockvis = vis.blockvis
if zero:
newvis.data['vis'][...] = 0.0
return newvis
[docs]def create_visibility(config: Configuration, times: numpy.array, frequency: numpy.array,
channel_bandwidth, phasecentre: SkyCoord,
weight: float, polarisation_frame=PolarisationFrame('stokesI'),
integration_time=1.0,
zerow=False, elevation_limit=15.0 * numpy.pi / 180.0, source='unknown', meta=None) -> Visibility:
""" Create a Visibility from Configuration, hour angles, and direction of source
Note that we keep track of the integration time for BDA purposes
:param config: Configuration of antennas
:param times: hour angles in radians
:param frequency: frequencies (Hz] [nchan]
:param weight: weight of a single sample
:param phasecentre: phasecentre of observation
:param channel_bandwidth: channel bandwidths: (Hz] [nchan]
:param integration_time: Integration time ('auto' or value in s)
:param polarisation_frame: PolarisationFrame('stokesI')
:return: Visibility
"""
assert phasecentre is not None, "Must specify phase centre"
if polarisation_frame is None:
polarisation_frame = correlate_polarisation(config.receptor_frame)
latitude = config.location.geodetic[1].to('rad').value
nch = len(frequency)
ants_xyz = config.data['xyz']
nants = len(config.data['names'])
nbaselines = int(nants * (nants - 1) / 2)
ntimes = 0
for iha, ha in enumerate(times):
# Calculate the positions of the antennas as seen for this hour angle
# and declination
_, elevation = hadec_to_azel(ha, phasecentre.dec.rad, latitude)
if elevation_limit is None or (elevation > elevation_limit):
ntimes +=1
npol = polarisation_frame.npol
nrows = nbaselines * ntimes * nch
nrowsperintegration = nbaselines * nch
rvis = numpy.zeros([nrows, npol], dtype='complex')
rweight = weight * numpy.ones([nrows, npol])
rtimes = numpy.zeros([nrows])
rfrequency = numpy.zeros([nrows])
rchannel_bandwidth = numpy.zeros([nrows])
rantenna1 = numpy.zeros([nrows], dtype='int')
rantenna2 = numpy.zeros([nrows], dtype='int')
ruvw = numpy.zeros([nrows, 3])
n_flagged = 0
# Do each hour angle in turn
row = 0
for iha, ha in enumerate(times):
# Calculate the positions of the antennas as seen for this hour angle
# and declination
_, elevation = hadec_to_azel(ha, phasecentre.dec.rad, latitude)
if elevation_limit is None or (elevation > elevation_limit):
rtimes[row:row + nrowsperintegration] = ha * 43200.0 / numpy.pi
# TODO: optimise loop
# Loop over all pairs of antennas. Note that a2>a1
ant_pos = xyz_to_uvw(ants_xyz, ha, phasecentre.dec.rad)
for a1 in range(nants):
for a2 in range(a1 + 1, nants):
rantenna1[row:row + nch] = a1
rantenna2[row:row + nch] = a2
rweight[row:row+nch,...] = 1.0
# Loop over all frequencies and polarisations
for ch in range(nch):
# noinspection PyUnresolvedReferences
k = frequency[ch] / constants.c.value
ruvw[row, :] = (ant_pos[a2, :] - ant_pos[a1, :]) * k
rfrequency[row] = frequency[ch]
rchannel_bandwidth[row] = channel_bandwidth[ch]
row += 1
if zerow:
ruvw[..., 2] = 0.0
assert row == nrows
rintegration_time = numpy.full_like(rtimes, integration_time)
vis = Visibility(uvw=ruvw, time=rtimes, antenna1=rantenna1, antenna2=rantenna2,
frequency=rfrequency, vis=rvis,
weight=rweight, imaging_weight=rweight,
integration_time=rintegration_time, channel_bandwidth=rchannel_bandwidth,
polarisation_frame=polarisation_frame, source=source, meta=meta)
vis.phasecentre = phasecentre
vis.configuration = config
log.info("create_visibility: %s" % (vis_summary(vis)))
assert isinstance(vis, Visibility), "vis is not a Visibility: %r" % vis
if elevation_limit is not None:
log.info('create_visibility: flagged %d/%d visibilities below elevation limit %f (rad)' %
(n_flagged, vis.nvis, elevation_limit))
else:
log.info('create_visibility: created %d visibilities' % (vis.nvis))
return vis
[docs]def create_blockvisibility(config: Configuration,
times: numpy.array,
frequency: numpy.array,
phasecentre: SkyCoord,
weight: float = 1.0,
polarisation_frame: PolarisationFrame = None,
integration_time=1.0,
channel_bandwidth=1e6,
zerow=False,
elevation_limit=None,
source='unknown',
meta=None,
**kwargs) -> BlockVisibility:
""" Create a BlockVisibility from Configuration, hour angles, and direction of source
Note that we keep track of the integration time for BDA purposes
:param config: Configuration of antennas
:param times: hour angles in radians
:param frequency: frequencies (Hz] [nchan]
:param weight: weight of a single sample
:param phasecentre: phasecentre of observation
:param channel_bandwidth: channel bandwidths: (Hz] [nchan]
:param integration_time: Integration time ('auto' or value in s)
:param polarisation_frame:
:return: BlockVisibility
"""
assert phasecentre is not None, "Must specify phase centre"
if polarisation_frame is None:
polarisation_frame = correlate_polarisation(config.receptor_frame)
latitude = config.location.geodetic[1].to('rad').value
nch = len(frequency)
ants_xyz = config.data['xyz']
nants = len(config.data['names'])
ntimes = 0
n_flagged = 0
for iha, ha in enumerate(times):
# Calculate the positions of the antennas as seen for this hour angle
# and declination
_, elevation = hadec_to_azel(ha, phasecentre.dec.rad, latitude)
if elevation_limit is None or (elevation > elevation_limit):
ntimes +=1
else:
n_flagged += 1
assert ntimes > 0, "No unflagged points"
if elevation_limit is not None:
log.info('create_visibility: flagged %d/%d times below elevation limit %f (rad)' %
(n_flagged, ntimes, elevation_limit))
else:
log.info('create_visibility: created %d times' % (ntimes))
npol = polarisation_frame.npol
visshape = [ntimes, nants, nants, nch, npol]
rvis = numpy.zeros(visshape, dtype='complex')
rweight = weight * numpy.ones(visshape)
rimaging_weight = numpy.ones(visshape)
rtimes = numpy.zeros([ntimes])
ruvw = numpy.zeros([ntimes, nants, nants, 3])
# Do each hour angle in turn
itime = 0
for iha, ha in enumerate(times):
# Calculate the positions of the antennas as seen for this hour angle
# and declination
ant_pos = xyz_to_uvw(ants_xyz, ha, phasecentre.dec.rad)
_, elevation = hadec_to_azel(ha, phasecentre.dec.rad, latitude)
if elevation_limit is None or (elevation > elevation_limit):
rtimes[itime] = ha * 43200.0 / numpy.pi
rweight[itime, ...] = 1.0
# Loop over all pairs of antennas. Note that a2>a1
for a1 in range(nants):
for a2 in range(a1 + 1, nants):
ruvw[itime, a2, a1, :] = (ant_pos[a2, :] - ant_pos[a1, :])
ruvw[itime, a1, a2, :] = (ant_pos[a1, :] - ant_pos[a2, :])
itime += 1
rintegration_time = numpy.full_like(rtimes, integration_time)
rchannel_bandwidth = channel_bandwidth
if zerow:
ruvw[..., 2] = 0.0
vis = BlockVisibility(uvw=ruvw, time=rtimes, frequency=frequency, vis=rvis, weight=rweight,
imaging_weight=rimaging_weight,
integration_time=rintegration_time, channel_bandwidth=rchannel_bandwidth,
polarisation_frame=polarisation_frame, source=source, meta=meta)
vis.phasecentre = phasecentre
vis.configuration = config
log.info("create_blockvisibility: %s" % (vis_summary(vis)))
assert isinstance(vis, BlockVisibility), "vis is not a BlockVisibility: %r" % vis
return vis
[docs]def create_visibility_from_rows(vis: Union[Visibility, BlockVisibility], rows: numpy.ndarray, makecopy=True):
""" Create a Visibility from selected rows
:param vis: Visibility
:param rows: Boolean array of row selction
:param makecopy: Make a deep copy (True)
:return: Visibility
"""
if rows is None or numpy.sum(rows) == 0:
return None
assert len(rows) == vis.nvis, "Length of rows does not agree with length of visibility"
if isinstance(vis, Visibility):
if makecopy:
newvis = copy_visibility(vis)
if vis.cindex is not None and len(rows) == len(vis.cindex):
newvis.cindex = vis.cindex[rows]
else:
newvis.cindex = None
if vis.blockvis is not None:
newvis.blockvis = vis.blockvis
newvis.data = copy.deepcopy(vis.data[rows])
return newvis
else:
vis.data = copy.deepcopy(vis.data[rows])
if vis.cindex is not None:
vis.cindex = vis.cindex[rows]
return vis
else:
if makecopy:
newvis = copy_visibility(vis)
newvis.data = copy.deepcopy(vis.data[rows])
return newvis
else:
vis.data = copy.deepcopy(vis.data[rows])
return vis
[docs]def phaserotate_visibility(vis: Visibility, newphasecentre: SkyCoord, tangent=True, inverse=False) -> Visibility:
"""
Phase rotate from the current phase centre to a new phase centre
If tangent is False the uvw are recomputed and the visibility phasecentre is updated.
Otherwise only the visibility phases are adjusted
:param vis: Visibility to be rotated
:param newphasecentre:
:param tangent: Stay on the same tangent plane? (True)
:param inverse: Actually do the opposite
:return: Visibility
"""
l, m, n = skycoord_to_lmn(newphasecentre, vis.phasecentre)
# No significant change?
if numpy.abs(n) < 1e-15:
return vis
# Make a new copy
newvis = copy_visibility(vis)
if isinstance(vis, Visibility):
phasor = simulate_point(newvis.uvw, l, m)
if len(newvis.vis.shape) > len(phasor.shape):
phasor = phasor[:, numpy.newaxis]
if inverse:
newvis.data['vis'] *= phasor
else:
newvis.data['vis'] *= numpy.conj(phasor)
# To rotate UVW, rotate into the global XYZ coordinate system and back. We have the option of
# staying on the tangent plane or not. If we stay on the tangent then the raster will
# join smoothly at the edges. If we change the tangent then we will have to reproject to get
# the results on the same image, in which case overlaps or gaps are difficult to deal with.
if not tangent:
if inverse:
xyz = uvw_to_xyz(vis.data['uvw'], ha=-newvis.phasecentre.ra.rad, dec=newvis.phasecentre.dec.rad)
newvis.data['uvw'][...] = \
xyz_to_uvw(xyz, ha=-newphasecentre.ra.rad, dec=newphasecentre.dec.rad)[...]
else:
# This is the original (non-inverse) code
xyz = uvw_to_xyz(newvis.data['uvw'], ha=-newvis.phasecentre.ra.rad, dec=newvis.phasecentre.dec.rad)
newvis.data['uvw'][...] = xyz_to_uvw(xyz, ha=-newphasecentre.ra.rad, dec=newphasecentre.dec.rad)[
...]
newvis.phasecentre = newphasecentre
return newvis
elif isinstance(vis, BlockVisibility):
k = numpy.array(vis.frequency) / constants.c.to('m s^-1').value
uvw = vis.uvw[..., numpy.newaxis] * k
phasor = numpy.ones_like(vis.vis, dtype='complex')
_, _, _, nchan, npol = vis.vis.shape
for chan in range(nchan):
phasor[:, :, :, chan, :] = simulate_point(uvw[..., chan], l, m)[..., numpy.newaxis]
if inverse:
newvis.data['vis'] *= phasor
else:
newvis.data['vis'] *= numpy.conj(phasor)
# To rotate UVW, rotate into the global XYZ coordinate system and back. We have the option of
# staying on the tangent plane or not. If we stay on the tangent then the raster will
# join smoothly at the edges. If we change the tangent then we will have to reproject to get
# the results on the same image, in which case overlaps or gaps are difficult to deal with.
if not tangent:
# UVW is shape [nants, nants, 3], we want [nants * nants, 3]
nrows, nants, _, _ = vis.uvw.shape
uvw_linear = vis.uvw.reshape([nrows * nants * nants, 3])
if inverse:
xyz = uvw_to_xyz(uvw_linear, ha=-newvis.phasecentre.ra.rad, dec=newvis.phasecentre.dec.rad)
uvw_linear = \
xyz_to_uvw(xyz, ha=-newphasecentre.ra.rad, dec=newphasecentre.dec.rad)[...]
else:
# This is the original (non-inverse) code
xyz = uvw_to_xyz(uvw_linear, ha=-newvis.phasecentre.ra.rad, dec=newvis.phasecentre.dec.rad)
uvw_linear = \
xyz_to_uvw(xyz, ha=-newphasecentre.ra.rad, dec=newphasecentre.dec.rad)[...]
newvis.phasecentre = newphasecentre
newvis.data['uvw'][...] = uvw_linear.reshape([nrows, nants, nants, 3])
return newvis
else:
raise ValueError("vis argument neither Visibility or BlockVisibility")
[docs]def export_blockvisibility_to_ms(msname, vis_list, source_name=None, ack=False):
""" Minimal BlockVisibility to MS converter
The MS format is much more general than the ARL BlockVisibility so we cut many corners. This requires casacore to be
installed. If not an exception ModuleNotFoundError is raised.
Write a list of BlockVisibility's to a MS file, split by field and spectral window
:param msname: File name of MS
:param vislist: BlockVisibility
:return:
"""
try:
import casacore.tables.tableutil as pt
from casacore.tables import (makescacoldesc, makearrcoldesc, table, maketabdesc, tableexists, tableiswritable,
tableinfo, tablefromascii, tabledelete, makecoldesc, msconcat, removeDerivedMSCal,
taql, tablerename, tablecopy, tablecolumn, addDerivedMSCal, removeImagingColumns,
addImagingColumns, required_ms_desc, tabledefinehypercolumn, default_ms, makedminfo,
default_ms_subtable)
from processing_components.visibility.msv2fund import Antenna, Stand
except ModuleNotFoundError:
raise ModuleNotFoundError("casacore is not installed")
try:
from processing_components.visibility import msv2
except ModuleNotFoundError:
raise ModuleNotFoundError("cannot import msv2")
# log.debug("create_blockvisibility_from_ms: %s" % str(tab.info()))
# Start the table
tbl = msv2.Ms(msname, ref_time=0, source_name= source_name, if_delete=True)
if source_name is None:
source_name = 'ARL'
for vis in vis_list:
# Check polarisition
npol = vis.npol
nchan = vis.nchan
nants = vis.nants
if vis.polarisation_frame.type == 'linear':
polarization = ['XX','XY','YX','YY']
elif vis.polarisation_frame.type == 'stokesI':
polarization = ['XX']
elif vis.polarisation_frame.type == 'circular':
polarization = ['RR','RL','LR','LL']
elif vis.polarisation_frame.type == 'stokesIQUV':
polarization = ['I','Q','U','V']
# Current ARL suppots I
tbl.set_stokes(polarization)
tbl.set_frequency(vis.frequency,vis.channel_bandwidth)
n_ant = len(vis.configuration.xyz)
antennas = []
names = vis.configuration.names
mount = vis.configuration.mount
diameter = vis.configuration.diameter
xyz = vis.configuration.xyz
for i in range(len(names)):
antennas.append(Antenna(i, Stand(names[i], xyz[i, 0], xyz[i, 1], xyz[i, 2])))
# Set baselines and data
blList = []
antennas2 = antennas
for i in range(0, n_ant - 1):
for j in range(i + 1, n_ant):
blList.append((antennas[i], antennas2[j]))
tbl.set_geometry(vis.configuration, antennas)
nbaseline = len(blList)
ntimes = len(vis.data['time'])
ms_vis = numpy.zeros([ntimes, nbaseline, nchan, npol]).astype('complex')
ms_uvw = numpy.zeros([ntimes, nbaseline, 3])
# bv_vis = numpy.zeros([ntimes, nants, nants, nchan, npol]).astype('complex')
# bv_weight = numpy.zeros([ntimes, nants, nants, nchan, npol])
# bv_imaging_weight = numpy.zeros([ntimes, nants, nants, nchan, npol])
# bv_uvw = numpy.zeros([ntimes, nants, nants, 3])
time = vis.data['time']
int_time = vis.data['integration_time']
bv_vis = vis.data['vis']
bv_uvw = vis.data['uvw']
# bv_antenna1 = vis.data['antenna1']
# bv_antenna2 = vis.data['antenna2']
time_last = time[0]
time_index = 0
for row,_ in enumerate(time):
# MS has shape [row, npol, nchan]
# BV has shape [ntimes, nants, nants, nchan, npol]
bl = 0
for i in range(0, n_ant - 1):
for j in range(i + 1, n_ant):
ms_vis[row, bl,...] = bv_vis[row, j, i, ...]
# bv_weight[time_index, antenna2[row], antenna1[row], :, ...] = ms_weight[row, numpy.newaxis, ...]
# bv_imaging_weight[time_index, antenna2[row], antenna1[row], :, ...] = ms_weight[row, numpy.newaxis, ...]
ms_uvw[row,bl,:] = bv_uvw[row, j, i, :]
bl += 1
# ms_vis = numpy.zeros([ntimes*vis.nants*vis.nants, vis.nchan, vis.npol]).astype('complex')
# ms_uvw = vis.uvw.reshape(ntimes,-1,3)
for ntime, time in enumerate(vis.data['time']):
for ipol, pol in enumerate(polarization):
if int_time[ntime] is not None:
tbl.add_data_set(time, int_time[ntime], blList, ms_vis[ntime, ..., ipol], pol=pol, source=source_name,
phasecentre=vis.phasecentre, uvw=ms_uvw[ntime, :, :])
else:
tbl.add_data_set(time, 0, blList, ms_vis[ntime, ..., ipol], pol=pol,
source=source_name, phasecentre = vis.phasecentre, uvw=ms_uvw[ntime, :, :])
tbl.write()
[docs]def list_ms(msname, ack=False):
""" List sources and data descriptors in a MeasurementSet
:param msname: File name of MS
:return:
"""
try:
from casacore.tables import table # pylint: disable=import-error
except ModuleNotFoundError:
raise ModuleNotFoundError("casacore is not installed")
try:
from processing_components.visibility import msv2
except ModuleNotFoundError:
raise ModuleNotFoundError("cannot import msv2")
tab = table(msname, ack=ack)
log.debug("list_ms: %s" % str(tab.info()))
fieldtab = table('%s/FIELD' % msname, ack=False)
sources = fieldtab.getcol('NAME')
dds = list(numpy.unique(tab.getcol('DATA_DESC_ID')))
return sources, dds
[docs]def create_blockvisibility_from_ms(msname, channum=None, start_chan=None, end_chan=None, ack=False,
datacolumn='DATA', selected_sources=None, selected_dds=None):
""" Minimal MS to BlockVisibility converter
The MS format is much more general than the ARL BlockVisibility so we cut many corners. This requires casacore to be
installed. If not an exception ModuleNotFoundError is raised.
Creates a list of BlockVisibility's, split by field and spectral window
Reading of a subset of channels is possible using either start_chan and end_chan or channnum. Using start_chan
and end_chan is preferred since it only reads the channels required. Channum is more flexible and can be used to
read a random list of channels.
:param msname: File name of MS
:param channum: range of channels e.g. range(17,32), default is None meaning all
:param start_chan: Starting channel to read
:param end_chan: End channel to read
:return:
"""
try:
from casacore.tables import table # pylint: disable=import-error
except ModuleNotFoundError:
raise ModuleNotFoundError("casacore is not installed")
try:
from processing_components.visibility import msv2
except ModuleNotFoundError:
raise ModuleNotFoundError("cannot import msv2")
tab = table(msname, ack=ack)
log.debug("create_blockvisibility_from_ms: %s" % str(tab.info()))
if selected_sources is None:
fields = numpy.unique(tab.getcol('FIELD_ID'))
else:
fieldtab = table('%s/FIELD' % msname, ack=False)
sources = fieldtab.getcol('NAME')
fields = list()
for field, source in enumerate(sources):
if source in selected_sources: fields.append(field)
assert len(fields) > 0, "No sources selected"
if selected_dds is None:
dds = numpy.unique(tab.getcol('DATA_DESC_ID'))
else:
dds = selected_dds
log.debug("create_blockvisibility_from_ms: Reading unique fields %s, unique data descriptions %s" % (
str(fields), str(dds)))
vis_list = list()
for field in fields:
ftab = table(msname, ack=ack).query('FIELD_ID==%d' % field, style='')
for dd in dds:
meta = {'MSV2':{'FIELD_ID': field, 'DATA_DESC_ID':dd}}
ms = ftab.query('DATA_DESC_ID==%d' % dd, style='')
assert ms.nrows() > 0, "Empty selection for FIELD_ID=%d and DATA_DESC_ID=%d" % (field, dd)
log.debug("create_blockvisibility_from_ms: Found %d rows" % (ms.nrows()))
# The TIME column has descriptor:
# {'valueType': 'double', 'dataManagerType': 'IncrementalStMan', 'dataManagerGroup': 'TIME',
# 'option': 0, 'maxlen': 0, 'comment': 'Modified Julian Day',
# 'keywords': {'QuantumUnits': ['s'], 'MEASINFO': {'type': 'epoch', 'Ref': 'UTC'}}}
otime = ms.getcol('TIME')
datacol = ms.getcol(datacolumn, nrow=1)
datacol_shape = list(datacol.shape)
channels = datacol.shape[-2]
log.debug("create_blockvisibility_from_ms: Found %d channels" % (channels))
if channum is None:
if start_chan is not None and end_chan is not None:
try:
log.debug("create_blockvisibility_from_ms: Reading channels from %d to %d" %
(start_chan, end_chan))
print("create_blockvisibility_from_ms: Reading channels from %d to %d (inclusive)" %
(start_chan, end_chan))
blc = [start_chan, 0]
trc = [end_chan, datacol_shape[-1] - 1]
channum = range(start_chan, end_chan+1)
ms_vis = ms.getcolslice(datacolumn, blc=blc, trc=trc)
ms_weight = ms.getcol('WEIGHT')
except IndexError:
raise IndexError("channel number exceeds max. within ms")
else:
log.debug("create_blockvisibility_from_ms: Reading all %d channels" % (channels))
try:
channum = range(channels)
ms_vis = ms.getcol(datacolumn)[:, channum, :]
ms_weight = ms.getcol('WEIGHT')
channum = range(channels)
except IndexError:
raise IndexError("channel number exceeds max. within ms")
else:
log.debug("create_blockvisibility_from_ms: Reading channels %s " % (channum))
channum = range(channels)
try:
ms_vis = ms.getcol(datacolumn)[:, channum, :]
ms_weight = ms.getcol('WEIGHT')[:, :]
except IndexError:
raise IndexError("channel number exceeds max. within ms")
uvw = -1 * ms.getcol('UVW')
antenna1 = ms.getcol('ANTENNA1')
antenna2 = ms.getcol('ANTENNA2')
integration_time = ms.getcol('INTERVAL')
# time = Time((time-integration_time/2.0)/86400+ 2400000.5,format='jd',scale='utc').utc.value
time = (otime - integration_time / 2.0)
start_time = numpy.min(time)/86400.0
end_time = numpy.max(time)/86400.0
log.debug("create_blockvisibility_from_ms: Observation from %s to %s" %
(Time(start_time, format='mjd').iso, Time(end_time, format='mjd').iso))
# Now get info from the subtables
spwtab = table('%s/SPECTRAL_WINDOW' % msname, ack=False)
cfrequency = spwtab.getcol('CHAN_FREQ')[dd][channum]
cchannel_bandwidth = spwtab.getcol('CHAN_WIDTH')[dd][channum]
nchan = cfrequency.shape[0]
# Get polarisation info
npol = 4
poltab = table('%s/POLARIZATION' % msname, ack=False)
corr_type = poltab.getcol('CORR_TYPE')
# These correspond to the CASA Stokes enumerations
if numpy.array_equal(corr_type[0], [1, 2, 3, 4]):
polarisation_frame = PolarisationFrame('stokesIQUV')
elif numpy.array_equal(corr_type[0], [5, 6, 7, 8]):
polarisation_frame = PolarisationFrame('circular')
elif numpy.array_equal(corr_type[0], [9, 10, 11, 12]):
polarisation_frame = PolarisationFrame('linear')
elif numpy.array_equal(corr_type[0], [9]):
npol = 1
polarisation_frame = PolarisationFrame('stokesI')
else:
raise KeyError("Polarisation not understood: %s" % str(corr_type))
# Get configuration
anttab = table('%s/ANTENNA' % msname, ack=False)
nants = anttab.nrows()
mount = anttab.getcol('MOUNT')
names = anttab.getcol('NAME')
diameter = anttab.getcol('DISH_DIAMETER')
xyz = anttab.getcol('POSITION')
configuration = Configuration(name='', data=None, location=None,
names=names, xyz=xyz, mount=mount, frame=None,
receptor_frame=ReceptorFrame("linear"),
diameter=diameter)
# Get phasecentres
fieldtab = table('%s/FIELD' % msname, ack=False)
pc = fieldtab.getcol('PHASE_DIR')[field, 0, :]
source = fieldtab.getcol('NAME')[field]
phasecentre = SkyCoord(ra=pc[0] * u.rad, dec=pc[1] * u.rad, frame='icrs', equinox='J2000')
time_index_row = numpy.zeros_like(time, dtype='int')
time_last = time[0]
time_index = 0
for row, _ in enumerate(time):
if time[row] > time_last + integration_time[row]:
assert time[row] > time_last, "MS is not time-sorted - cannot convert"
time_index += 1
time_last = time[row]
time_index_row[row] = time_index
ntimes = time_index + 1
bv_times = numpy.zeros([ntimes])
bv_vis = numpy.zeros([ntimes, nants, nants, nchan, npol]).astype('complex')
bv_weight = numpy.zeros([ntimes, nants, nants, nchan, npol])
bv_imaging_weight = numpy.zeros([ntimes, nants, nants, nchan, npol])
bv_uvw = numpy.zeros([ntimes, nants, nants, 3])
bv_integration_time = numpy.zeros([ntimes])
for row, _ in enumerate(time):
time_index = time_index_row[row]
bv_times[time_index] = time[row]
bv_vis[time_index, antenna2[row], antenna1[row], ...] = ms_vis[row, ...]
bv_weight[time_index, antenna2[row], antenna1[row], :, ...] = ms_weight[row, numpy.newaxis, ...]
bv_imaging_weight[time_index, antenna2[row], antenna1[row], :, ...] = ms_weight[row, numpy.newaxis, ...]
bv_uvw[time_index, antenna2[row], antenna1[row], :] = uvw[row, :]
bv_integration_time[time_index] = integration_time[row]
vis_list.append(BlockVisibility(uvw=bv_uvw,
time=bv_times,
frequency=cfrequency,
channel_bandwidth=cchannel_bandwidth,
vis=bv_vis,
weight=bv_weight,
integration_time = bv_integration_time,
imaging_weight=bv_imaging_weight,
configuration=configuration,
phasecentre=phasecentre,
polarisation_frame=polarisation_frame,
source=source, meta=meta))
tab.close()
return vis_list
[docs]def create_visibility_from_ms(msname, channum=None, start_chan=None, end_chan=None, ack=False):
""" Minimal MS to BlockVisibility converter
The MS format is much more general than the ARL BlockVisibility so we cut many corners. This requires casacore to be
installed. If not an exception ModuleNotFoundError is raised.
Creates a list of BlockVisibility's, split by field and spectral window
Reading of a subset of channels is possible using either start_chan and end_chan or channnum. Using start_chan
and end_chan is preferred since it only reads the channels required. Channum is more flexible and can be used to
read a random list of channels.
:param msname: File name of MS
:param channum: range of channels e.g. range(17,32), default is None meaning all
:param start_chan: Starting channel to read
:param end_chan: End channel to read
:return:
"""
from processing_components.visibility.coalesce import convert_blockvisibility_to_visibility
return [convert_blockvisibility_to_visibility(v)
for v in create_blockvisibility_from_ms(msname=msname, channum=channum,
start_chan=start_chan, end_chan=end_chan, ack=ack)]
[docs]def create_blockvisibility_from_uvfits(fitsname, channum=None, ack=False, antnum=None):
""" Minimal UVFIT to BlockVisibility converter
The UVFITS format is much more general than the ARL BlockVisibility so we cut many corners.
Creates a list of BlockVisibility's, split by field and spectral window
:param fitsname: File name of UVFITS
:param channum: range of channels e.g. range(17,32), default is None meaning all
:param antnum: the number of antenna
:return:
"""
def ParamDict(hdul):
"Return the dictionary of the random parameters"
"""
The keys of the dictionary are the parameter names uppercased for
consistency. The values are the column numbers.
If multiple parameters have the same name (e.g., DATE) their
columns are entered as a list.
"""
pre=re.compile(r"PTYPE(?P<i>\d+)")
res={}
for k,v in hdul.header.items():
m=pre.match(k)
if m :
vu=v.upper()
if vu in res:
res[ vu ] = [ res[vu], int(m.group("i")) ]
else:
res[ vu ] = int(m.group("i"))
return res
# Open the file
with fits.open(fitsname) as hdul:
# Read Spectral Window
nspw = hdul[0].header['NAXIS5']
# Read Channel and Frequency Interval
freq_ref = hdul[0].header['CRVAL4']
mid_chan_freq = hdul[0].header['CRPIX4']
delt_freq = hdul[0].header['CDELT4']
# Real the number of channels in one spectral window
channels = hdul[0].header['NAXIS4']
freq = numpy.zeros([nspw, channels])
# Read Frequency or IF
freqhdulname="AIPS FQ"
sdhu = hdul.index_of(freqhdulname)
if_freq = hdul[sdhu].data['IF FREQ'].ravel()
for i in range(nspw):
temp = numpy.array([if_freq[i] + freq_ref+delt_freq* ff for ff in range(channels)])
freq[i,:] = temp[:]
freq_delt = numpy.ones(channels) * delt_freq
if channum is None:
channum = range(channels)
primary = hdul[0].data
# Read time
bvtimes = Time(hdul[0].data['DATE'], hdul[0].data['_DATE'], format='jd')
bv_times = numpy.unique(bvtimes.jd)
ntimes = len(bv_times)
# # Get Antenna
# blin = hdul[0].data['BASELINE']
antennahdulname="AIPS AN"
adhu = hdul.index_of(antennahdulname)
try:
antenna_name = hdul[adhu].data['ANNAME']
antenna_name = antenna_name.encode('ascii','ignore')
except:
antenna_name = None
antenna_xyz = hdul[adhu].data['STABXYZ']
antenna_mount = hdul[adhu].data['MNTSTA']
try:
antenna_diameter = hdul[adhu].data['DIAMETER']
except:
antenna_diameter = None
# To reading some UVFITS with wrong numbers of antenna
if antnum is not None:
if antenna_name is not None:
antenna_name = antenna_name[:antnum]
antenna_xyz = antenna_xyz[:antnum]
antenna_mount = antenna_mount[:antnum]
if antenna_diameter is not None:
antenna_diameter = antenna_diameter[:antnum]
nants = len(antenna_xyz)
# res= {}
# for i,row in enumerate(fin[ahdul].data):
# res[row.field("ANNAME") ] = i +1
# Get polarisation info
npol = hdul[0].header['NAXIS3']
corr_type = numpy.arange(hdul[0].header['NAXIS3']) - (hdul[0].header['CRPIX3'] - 1)
corr_type *= hdul[0].header['CDELT3']
corr_type += hdul[0].header['CRVAL3']
# xx yy xy yx
# These correspond to the CASA Stokes enumerations
if numpy.array_equal(corr_type, [1, 2, 3, 4]):
polarisation_frame = PolarisationFrame('stokesIQUV')
elif numpy.array_equal(corr_type, [-1, -2, -3, -4]):
polarisation_frame = PolarisationFrame('circular')
elif numpy.array_equal(corr_type, [-5, -6, -7, -8]):
polarisation_frame = PolarisationFrame('linear')
else:
raise KeyError("Polarisation not understood: %s" % str(corr_type))
configuration = Configuration(name='', data=None, location=None,
names=antenna_name, xyz=antenna_xyz, mount=antenna_mount, frame=None,
receptor_frame=polarisation_frame,
diameter=antenna_diameter)
# Get RA and DEC
phase_center_ra_degrees = numpy.float(hdul[0].header['CRVAL6'])
phase_center_dec_degrees = numpy.float(hdul[0].header['CRVAL7'])
# Get phasecentres
phasecentre = SkyCoord(ra=phase_center_ra_degrees * u.deg, dec=phase_center_dec_degrees * u.deg, frame='icrs', equinox='J2000')
# Get UVW
d=ParamDict(hdul[0])
if "UU" in d:
uu = hdul[0].data['UU']
vv = hdul[0].data['VV']
ww = hdul[0].data['WW']
else:
uu = hdul[0].data['UU---SIN']
vv = hdul[0].data['VV---SIN']
ww = hdul[0].data['WW---SIN']
_vis = hdul[0].data['DATA']
#_vis.shape = (nchan, ntimes, (nants*(nants-1)//2 ), npol, -1)
#self.vis = -(_vis[...,0] * 1.j + _vis[...,1])
row = 0
nchan = len(channum)
vis_list = list()
for spw_index in range(nspw):
bv_vis = numpy.zeros([ntimes, nants, nants, nchan, npol]).astype('complex')
bv_weight = numpy.zeros([ntimes, nants, nants, nchan, npol])
bv_uvw = numpy.zeros([ntimes, nants, nants, 3])
for time_index , time in enumerate(bv_times):
#restfreq = freq[channel_index]
for antenna1 in range(nants-1):
for antenna2 in range(antenna1 + 1, nants):
for channel_no, channel_index in enumerate(channum):
for pol_index in range(npol):
bv_vis[time_index, antenna2, antenna1, channel_no,pol_index] = complex(_vis[row,:,:,spw_index,channel_index, pol_index ,0],_vis[row,:,:,spw_index,channel_index,pol_index ,1])
bv_weight[time_index, antenna2, antenna1, channel_no, pol_index] = _vis[row,:,:,spw_index,channel_index,pol_index ,2]
bv_uvw[time_index, antenna2, antenna1, 0] = uu[row]* constants.c.value
bv_uvw[time_index, antenna2, antenna1, 1] = vv[row]* constants.c.value
bv_uvw[time_index, antenna2, antenna1, 2] = ww[row]* constants.c.value
row += 1
vis_list.append(BlockVisibility(uvw=bv_uvw,
time=bv_times,
frequency=freq[spw_index][channum],
channel_bandwidth=freq_delt[channum],
vis=bv_vis,
weight=bv_weight,
imaging_weight= bv_weight,
configuration=configuration,
phasecentre=phasecentre,
polarisation_frame=polarisation_frame))
return vis_list
[docs]def create_visibility_from_uvfits(fitsname, channum=None, ack=False, antnum=None):
""" Minimal UVFITS to BlockVisibility converter
Creates a list of BlockVisibility's, split by field and spectral window
:param fitsname: File name of UVFITS file
:param channum: range of channels e.g. range(17,32), default is None meaning all
:param antnum: the number of antenna
:return:
"""
from processing_components.visibility.coalesce import convert_blockvisibility_to_visibility
return [convert_blockvisibility_to_visibility(v)
for v in create_blockvisibility_from_uvfits(fitsname=fitsname, channum=channum, ack=ack, antnum=antnum)]