"""Useful array functions.
"""
__all__ = ['average_chunks', 'average_chunks2', 'tukey_filter', 'insert_array',
'insert_function_L', 'insert_function_pswf', 'insert_function_sinc']
# import numba
import numpy
# @numba.jit([numba.types.Tuple((numba.float64[:], numba.float64[:]))
# (numba.float64[:], numba.float64[:], numba.int64)], nopython=True)
def average_chunks_jit(arr, wts, chunksize):
""" Average the array arr with weights by chunks
Array len does not have to be multiple of chunksize
This is a version written for numba. When used with numba.jit, it's about 25 - 30% faster than the
numpy version without jit.
:param arr: 1D array of values
:param wts: 1D array of weights
:param chunksize: averaging size
:return: 1D array of averaged data_models, 1d array of weights
"""
if chunksize <= 1:
return arr, wts
nchunks = len(arr) // chunksize
extra = len(arr) % chunksize
if extra > 0:
fullsize = nchunks + 1
else:
fullsize = nchunks
chunks = numpy.empty(fullsize, dtype=arr.dtype)
weights = numpy.empty(fullsize, dtype=wts.dtype)
for place in range(nchunks):
chunks[place] = numpy.sum(
wts[place * chunksize:(place + 1) * chunksize] * arr[place * chunksize:(place + 1) * chunksize])
weights[place] = numpy.sum(wts[place * chunksize:(place + 1) * chunksize])
if extra > 0:
chunks[-1] = numpy.sum(wts[(len(arr) - extra):len(arr)] * arr[(len(arr) - extra):len(arr)])
weights[-1] = numpy.sum(wts[(len(arr) - extra):len(arr)])
chunks[weights > 0.0] = chunks[weights > 0.0] / weights[weights > 0.0]
return chunks, weights
[docs]def average_chunks(arr, wts, chunksize):
""" Average the array arr with weights by chunks
Array len does not have to be multiple of chunksize
This version is optimised for plain numpy. It is roughly ten times faster that average_chunks_jit when used
without numba jit. It cannot (yet) be used with numba because the add.reduceat is not support in numba
0.31
:param arr: 1D array of values
:param wts: 1D array of weights
:param chunksize: averaging size
:return: 1D array of averaged data_models, 1d array of weights
"""
if chunksize <= 1:
return arr, wts
# Original codes
# places = range(0, len(arr), chunksize)
# chunks = numpy.add.reduceat(wts * arr, places)
# weights = numpy.add.reduceat(wts, places)
# chunks[weights > 0.0] = chunks[weights > 0.0] / weights[weights > 0.0]
# Codes optimized
mask = numpy.zeros(((len(arr)-1)//chunksize + 1, arr.shape[0]), dtype=bool)
for enumerate_id,i in enumerate(range(0, len(arr), chunksize)):
mask[enumerate_id,i:i+chunksize]=1
chunks = mask.dot(wts*arr)
weights = mask.dot(wts)
# chunks[weights > 0.0] = chunks[weights > 0.0] / weights[weights > 0.0]
numpy.putmask(chunks, weights>0.0, chunks/weights)
return chunks, weights
[docs]def average_chunks2(arr, wts, chunksize):
""" Average the two dimensional array arr with weights by chunks
Array len does not have to be multiple of chunksize.
:param arr: 2D array of values
:param wts: 2D array of weights
:param chunksize: 2-tuple of averaging region e.g. (2,3)
:return: 2D array of averaged data_models, 2d array of weights
"""
# Do each axis to determine length
# assert arr.shape == wts.shape, "Shapes of arrays must be the same"
# It is possible that there is a dangling null axis on wts
wts = wts.reshape(arr.shape)
l0 = len(average_chunks(arr[:, 0], wts[:, 0], chunksize[0])[0])
l1 = len(average_chunks(arr[0, :], wts[0, :], chunksize[1])[0])
tempchunks = numpy.zeros([arr.shape[0], l1], dtype=arr.dtype)
tempwt = numpy.zeros([arr.shape[0], l1])
tempchunks *= tempwt
for i in range(arr.shape[0]):
result = average_chunks(arr[i, :], wts[i, :], chunksize[1])
tempchunks[i, :], tempwt[i, :] = result[0].flatten(), result[1].flatten()
chunks = numpy.zeros([l0, l1], dtype=arr.dtype)
weights = numpy.zeros([l0, l1])
for i in range(l1):
result = average_chunks(tempchunks[:, i], tempwt[:, i], chunksize[0])
chunks[:, i], weights[:, i] = result[0].flatten(), result[1].flatten()
return chunks, weights
[docs]def tukey_filter(x, r):
""" Calculate the Tukey (tapered cosine) filter
See e.g. https://uk.mathworks.com/help/signal/ref/tukeywin.html
:param x: x coordinate (float)
:param r: transition point of filter (float)
:returns: Value of filter for x
"""
if 0.0 <= x < r / 2.0:
return 0.5 * (1.0 + numpy.cos(2.0 * numpy.pi * (x - r / 2.0) / r))
elif 1 - r / 2.0 <= x <= 1.0:
return 0.5 * (1.0 + numpy.cos(2.0 * numpy.pi * (x - 1 + r / 2.0) / r))
else:
return 1.0
[docs]def insert_function_sinc(x):
s = numpy.zeros_like(x)
s[x != 0.0] = numpy.sin(numpy.pi * x[x != 0.0]) / (numpy.pi * x[x != 0.0])
return s
[docs]def insert_function_L(x, a=5):
L = insert_function_sinc(x) * insert_function_sinc(x / a)
return L
[docs]def insert_function_pswf(x, a=5):
from processing_library.fourier_transforms.convolutional_gridding import grdsf
return grdsf(abs(x) / a)[1]
[docs]def insert_array(im, x, y, flux, bandwidth=1.0, support=7, insert_function=insert_function_L):
""" Insert point into image using specified function
:param im: Image
:param x: x in float pixels
:param y: y in float pixels
:param flux: Flux[nchan, npol]
:param bandwidth: Support of data in uv plane
:param support: Support of function in image space
:param insert_function: insert_function_L or insert_function_Sinc or insert_function_pswf
:return:
"""
nchan, npol, ny, nx = im.shape
intx = int(numpy.round(x))
inty = int(numpy.round(y))
fracx = x - intx
fracy = y - inty
gridx = numpy.arange(-support, support)
gridy = numpy.arange(-support, support)
insert = numpy.outer(insert_function(bandwidth * (gridy - fracy)),
insert_function(bandwidth * (gridx - fracx)))
insertsum = numpy.sum(insert)
assert insertsum > 0, "Sum of interpolation coefficients %g" % insertsum
insert = insert / insertsum
for chan in range(nchan):
for pol in range(npol):
im[chan, pol, inty - support:inty + support, intx - support:intx + support] += flux[chan, pol] * insert
return im