Pipeline processing using arlexecute workflows. =============================================== This notebook demonstrates the continuum imaging and ICAL pipelines. These are based on ARL functions wrapped up as SDP workflows using the arlexecute class. .. code:: ipython3 %matplotlib inline import os import sys sys.path.append(os.path.join('..', '..')) from data_models.parameters import arl_path results_dir = arl_path('test_results') from matplotlib import pylab pylab.rcParams['figure.figsize'] = (12.0, 12.0) pylab.rcParams['image.cmap'] = 'rainbow' import numpy from astropy.coordinates import SkyCoord from astropy import units as u from astropy.wcs.utils import pixel_to_skycoord from matplotlib import pyplot as plt from data_models.polarisation import PolarisationFrame from processing_components.calibration.calibration import solve_gaintable from processing_components.calibration import apply_gaintable from processing_components.calibration.calibration_control import create_calibration_controls from processing_components.visibility import create_blockvisibility from processing_components.skycomponent import create_skycomponent from processing_components.image import deconvolve_cube from processing_components.image import show_image, export_image_to_fits, qa_image from processing_components.visibility import vis_timeslice_iter from processing_components.simulation.testing_support import create_low_test_image_from_gleam from processing_components.simulation.configurations import create_named_configuration from processing_components.imaging import predict_2d, create_image_from_visibility, advise_wide_field from processing_components.visibility import convert_blockvisibility_to_visibility from workflows.arlexecute.imaging.imaging_arlexecute import invert_list_arlexecute_workflow, \ predict_list_arlexecute_workflow, deconvolve_list_arlexecute_workflow from workflows.arlexecute.simulation.simulation_arlexecute import simulate_list_arlexecute_workflow, \ corrupt_list_arlexecute_workflow from workflows.arlexecute.pipelines.pipeline_arlexecute import continuum_imaging_list_arlexecute_workflow, \ ical_list_arlexecute_workflow from wrappers.arlexecute.execution_support.arlexecute import arlexecute import pprint pp = pprint.PrettyPrinter() import logging def init_logging(): log = logging.getLogger() logging.basicConfig(filename='%s/imaging-pipeline.log' % results_dir, filemode='a', format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s', datefmt='%H:%M:%S', level=logging.INFO) log = logging.getLogger() logging.info("Starting imaging-pipeline") We will use dask .. code:: ipython3 arlexecute.set_client(use_dask=True) arlexecute.run(init_logging) .. parsed-literal:: /home/jenkins-slave/workspace/ce-library_feature-improved-docs/_build/lib/python3.5/site-packages/distributed/dashboard/core.py:72: UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the diagnostics dashboard on a random port instead. warnings.warn("\n" + msg) .. parsed-literal:: {'tcp://127.0.0.1:36259': None, 'tcp://127.0.0.1:42581': None, 'tcp://127.0.0.1:43559': None, 'tcp://127.0.0.1:45117': None} .. code:: ipython3 pylab.rcParams['figure.figsize'] = (12.0, 12.0) pylab.rcParams['image.cmap'] = 'Greys' We create a graph to make the visibility. The parameter rmax determines the distance of the furthest antenna/stations used. All over parameters are determined from this number. .. code:: ipython3 nfreqwin=7 ntimes=5 rmax=300.0 frequency=numpy.linspace(1e8,1.2e8,nfreqwin) channel_bandwidth=numpy.array(nfreqwin*[frequency[1]-frequency[0]]) times = numpy.linspace(-numpy.pi/3.0, numpy.pi/3.0, ntimes) phasecentre=SkyCoord(ra=+30.0 * u.deg, dec=-60.0 * u.deg, frame='icrs', equinox='J2000') bvis_list=simulate_list_arlexecute_workflow('LOWBD2', frequency=frequency, channel_bandwidth=channel_bandwidth, times=times, phasecentre=phasecentre, order='frequency', rmax=rmax, format='blockvis') vis_list = [arlexecute.execute(convert_blockvisibility_to_visibility)(bv) for bv in bvis_list] print('%d elements in vis_list' % len(vis_list)) log.info('About to make visibility') vis_list = arlexecute.compute(vis_list, sync=True) .. parsed-literal:: 7 elements in vis_list .. code:: ipython3 wprojection_planes=1 advice_low=advise_wide_field(vis_list[0], guard_band_image=8.0, delA=0.02, wprojection_planes=wprojection_planes) advice_high=advise_wide_field(vis_list[-1], guard_band_image=8.0, delA=0.02, wprojection_planes=wprojection_planes) vis_slices = advice_low['vis_slices'] npixel=advice_high['npixels2'] cellsize=min(advice_low['cellsize'], advice_high['cellsize']) Now make a graph to fill with a model drawn from GLEAM .. code:: ipython3 gleam_model = [arlexecute.execute(create_low_test_image_from_gleam)(npixel=npixel, frequency=[frequency[f]], channel_bandwidth=[channel_bandwidth[f]], cellsize=cellsize, phasecentre=phasecentre, polarisation_frame=PolarisationFrame("stokesI"), flux_limit=1.0, applybeam=True) for f, freq in enumerate(frequency)] log.info('About to make GLEAM model') gleam_model = arlexecute.compute(gleam_model, sync=True) future_gleam_model = arlexecute.scatter(gleam_model) .. code:: ipython3 log.info('About to run predict to get predicted visibility') future_vis_graph = arlexecute.scatter(vis_list) predicted_vislist = predict_list_arlexecute_workflow(future_vis_graph, gleam_model, context='wstack', vis_slices=vis_slices) predicted_vislist = arlexecute.compute(predicted_vislist, sync=True) corrupted_vislist = corrupt_list_arlexecute_workflow(predicted_vislist, phase_error=1.0) log.info('About to run corrupt to get corrupted visibility') corrupted_vislist = arlexecute.compute(corrupted_vislist, sync=True) future_predicted_vislist=arlexecute.scatter(predicted_vislist) .. parsed-literal:: /home/jenkins-slave/workspace/ce-library_feature-improved-docs/_build/lib/python3.5/site-packages/distributed/worker.py:3235: UserWarning: Large object of size 2.10 MB detected in task graph: ('getitem-935182402f32bdac003d4310332f841e',