Workflows

Execution

Execution (optionally via Dask)

Execute wrap dask such that with the same code Dask.delayed can be replaced by immediate calculation

Dask init

Initialise dask

findNodes(c)[source]

Find Nodes being used for this Client

get_dask_Client(timeout=30, n_workers=None, threads_per_worker=1, processes=True, create_cluster=True, memory_limit=None)[source]

Get a Dask.distributed Client for the scheduler defined externally, otherwise create

The environment variable ARL_DASK_SCHEDULER is interpreted as pointing to the scheduler. and a client using that scheduler is returned. Otherwise a client is created

Parameters:
  • timeout – Time out for creation
  • n_workers – Number of workers
  • threads_per_worker
  • processes – Use processes instead of threads
  • create_cluster – Create a LocalCluster
  • memory_limit – Memory limit per worker (bytes e.g. 8e9)
Returns:

Dask client

get_nodes()[source]

Get the nodes being used

The environment variable ARL_HOSTFILE is interpreted as file containing the nodes

Returns:List of strings

Pipelines

Pipelines using arlexecute

SDP pipelines as processing components.

continuum_imaging_workflow(vis_list, model_imagelist, context='2d', **kwargs)[source]

Create graph for the continuum imaging pipeline.

Same as ICAL but with no selfcal.

Parameters:
  • vis_list
  • model_imagelist
  • context – Imaging context
  • kwargs – Parameters for functions in components
Returns:

ical_workflow(vis_list, model_imagelist, context='2d', calibration_context='TG', do_selfcal=True, **kwargs)[source]

Create graph for ICAL pipeline

Parameters:
  • vis_list
  • model_imagelist
  • context – imaging context e.g. ‘2d’
  • calibration_context – Sequence of calibration steps e.g. TGB
  • do_selfcal – Do the selfcalibration?
  • kwargs – Parameters for functions in components
Returns:

spectral_line_imaging_workflow(vis_list, model_imagelist, continuum_model_imagelist=None, context='2d', **kwargs)[source]

Create graph for spectral line imaging pipeline

Uses the ical pipeline after subtraction of a continuum model

Parameters:
  • vis_list – List of visibility components
  • model_imagelist – Spectral line model graph
  • continuum_model_imagelist – Continuum model list
  • context – Imaging context
  • kwargs – Parameters for functions in components
Returns:

(deconvolved model, residual, restored)

Support

Pipelines expressed as dask components

corrupt_workflow(vis_list, gt_list=None, **kwargs)[source]

Create a graph to apply gain errors to a vis_list

Parameters:
  • vis_list
  • gt_list – Optional gain table graph
  • kwargs
Returns:

simulate_workflow(config='LOWBD2', phasecentre=<SkyCoord (ICRS): (ra, dec) in deg (15., -60.)>, frequency=None, channel_bandwidth=None, times=None, polarisation_frame=<data_models.polarisation.PolarisationFrame object>, order='frequency', format='blockvis', rmax=1000.0, zerow=False)[source]

A component to simulate an observation

The simulation step can generate a single BlockVisibility or a list of BlockVisibility’s. The parameter keyword determines the way that the list is constructed. If order=’frequency’ then len(frequency) BlockVisibility’s with all times are created. If order=’time’ then len(times) BlockVisibility’s with all frequencies are created. If order = ‘both’ then len(times) * len(times) BlockVisibility’s are created each with a single time and frequency. If order = None then all data are created in one BlockVisibility.

The output format can be either ‘blockvis’ (for calibration) or ‘vis’ (for imaging)

Parameters:
  • config – Name of configuration: def LOWBDS-CORE
  • phasecentre – Phase centre def: SkyCoord(ra=+15.0 * u.deg, dec=-60.0 * u.deg, frame=’icrs’, equinox=’J2000’)
  • frequency – def [1e8]
  • channel_bandwidth – def [1e6]
  • times – Observing times in radians: def [0.0]
  • polarisation_frame – def PolarisationFrame(“stokesI”)
  • order – ‘time’ or ‘frequency’ or ‘both’ or None: def ‘frequency’
  • format – ‘blockvis’ or ‘vis’: def ‘blockvis’
Returns:

vis_list with different frequencies in different elements

Processing Component Interface

Wrappers

The python script processing_component_interface enables enable running ARL components from another environment such as bash.

A bash script example for a continuum imaging pipeline is:

#!/usr/bin/env bash
cd ${ARL}/workflows/wrapped
comp_location=${ARL}/processing_components/processing_component_interface
python ${comp_location}/processing_component_interface.py --config gleam_create_vislist.json
python ${comp_location}/processing_component_interface.py --config gleam_create_skymodel.json
python ${comp_location}/processing_component_interface.py --config gleam_predict_vislist.json
python ${comp_location}/processing_component_interface.py --config gleam_continuum_imaging.json

To be available in this way, a component must be wrapped appropriately and placed in processing_component_wrappers.py. For example, here is a simple wrapper:

def corrupt_vislist_wrapper(conf):
    vis_list = buffer_data_model_to_memory(conf["buffer"], conf['inputs']['vis_list'])
    phase_error = json_to_quantity(conf['corrupt_vislist']['phase_error']).to('rad').value
    
    corrupted_vislist = corrupt_workflow(vis_list,
                                          phase_error=phase_error,
                                          amplitude_error=conf['corrupt_vislist']['amplitude_error'])
    
    return arlexecute.execute(memory_data_model_to_buffer)(corrupted_vislist, conf["buffer"], conf['outputs']['vis_list'])

the wrapper for predict is:

def predict_vislist_wrapper(conf):
    vis_list = buffer_data_model_to_memory(conf["buffer"], conf['inputs']['vis_list'])
    skymodel = buffer_data_model_to_memory(conf["buffer"], conf['inputs']['skymodel'])
    
    flux_limit = conf['primary_beam']['flux_limit']
    
    if conf["primary_beam"]["apply"]:
        def apply_pb_image(vt, model):
            telescope = vt.configuration.name
            pb = create_pb(model, telescope)
            model.data *= pb.data
            return model
        
        def apply_pb_comp(vt, model, comp):
            telescope = vt.configuration.name
            pb = create_pb(model, telescope)
            return apply_beam_to_skycomponent(comp, pb, flux_limit)
        
        image_list = [arlexecute.execute(apply_pb_image, nout=1)(v, skymodel.images[i]) for i, v in enumerate(vis_list)]
        if len(skymodel.components) > 1:
            component_list = [arlexecute.execute(apply_pb_comp, nout=1)(v, skymodel.images[i], skymodel.components)
                            for i, v in enumerate(vis_list)]
        else:
            component_list = []
    else:
        image_list = [skymodel.images[0] for v in vis_list]
        component_list = skymodel.components
    
    future_vis_list = arlexecute.scatter(vis_list)
    predicted_vis_list = [arlexecute.execute(predict_skycomponent_visibility)(v, component_list)
                          for v in future_vis_list]
    predicted_vis_list = predict_workflow(predicted_vis_list, image_list,
                                           context=conf['imaging']['context'],
                                           vis_slices=conf['imaging']['vis_slices'])
    
    return arlexecute.execute(memory_data_model_to_buffer)(predicted_vis_list, conf["buffer"],
                                                         conf["outputs"]["vis_list"])

The JSON files contain the name of the component to be run and all the parameters necessary. An example of a JSON file is:

{
    "execute": {
        "use_dask": true,
        "n_workers": 4,
        "memory_limit": 4000000000
    },
    "component": {
        "framework": "ARL",
        "name": "predict_vislist"
    },
    "logging": {
        "filename": "test_pipeline.log",
        "filemode": "a",
        "format": "%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s",
        "datefmt": "%H:%M:%S",
        "level": "INFO"
    },
    "buffer": {
        "directory": "test_results/"
    },

    "inputs": {
        "skymodel": {
            "name":"test_skymodel.hdf",
            "data_model": "SkyModel"
        },
        "vis_list": {
            "name": "test_empty_vislist.hdf",
            "data_model": "BlockVisibility"
        }
    },
    "outputs": {
        "vis_list": {
            "name": "test_perfect_vislist.hdf",
            "data_model": "BlockVisibility"
        }
    },
    "imaging": {
        "context": "wstack",
        "vis_slices": 11
    },
    "primary_beam": {
        "apply": true,
        "flux_limit" : {"value": 0.01, "unit":"Jy"}
    }
}

The parameters for the component are passed via a JSON file, either via python:

component_wrapper("gleam_continuum_imaging.json")

or the dict derived from JSON may be passed directly:

config = initialise_config_wrapper("gleam_continuum_imaging.json")
component_wrapper(config)

or from bash:

python component_wrapper -- config "gleam_continuum_imaging.json"

Examples of json files are in tests/workflows/.

ARL JSON schema

Definition of ARL JSON schema and helper functions

The ARL JSON schema contains various definitions. Some are for logging and execution, some are for standard uses ( such as image and imaging), some are helper definitions (like linspace), and some are directed for specific processing_component_interface.

Compliance with the schema is checked on loading. Definitions not present in the schema are allowed. Finally no default handling is currently present so all fields must be defined.

Helper functions for converting JSON to ARL objects

json_to_linspace(l)[source]

Convert JSON string to numpy.linspace

e.g. “frequency”: {“start”: 0.9e8,”stop”: 1.1e8,”steps”: 7}

Parameters:l
Returns:
json_to_quantity(q)[source]

Convert JSON string to Quantity

e.g. “cellsize”: {“value”: 0.001, “unit”: “rad”}

Parameters:q
Returns:
json_to_skycoord(d)[source]

Convert JSON string to SkyCoord

e.g. “phasecentre”: {
“ra”: {“value”: 30.0, “unit”: “deg”}, “dec”: {“value”: -60.0, “unit”: “deg”}, “frame”: “icrs”, “equinox”: “j2000”}
Parameters:d
Returns:

Component wrapper

This is the processing wrapper interface into the ARL, allowing accessing to wrapped components.

component_wrapper(config)[source]

Run an ARL component as described in a JSON dict or file

Parameters:config – JSON dict or file
Returns:

Processing component wrapper

Wrappers around ARL processing components, using JSON for configuration.

These can be executed using processing_workflow_interface.py.

continuum_imaging_wrapper(conf)[source]

Wrap continuum imaging pipeline

Parameters:conf
Returns:
corrupt_vislist_wrapper(conf)[source]

Wrapper for corruption

Parameters:conf
Returns:
create_skymodel_wrapper(conf)[source]

Wrapper to create skymodel

Parameters:conf – Configuration from JSON file
Returns:
create_vislist_wrapper(conf)[source]

Create an empty vislist

Parameters:conf – Configuration from JSON file
Returns:
ical_wrapper(conf)[source]

Wrap ICAL pipeline

Parameters:conf – Configuration from JSON file
Returns:
predict_vislist_wrapper(conf)[source]

Wrapper for prediction

Parameters:conf – Configuration from JSON file
Returns:

Execution helpers

Wrappers needed for execution

initialise_config_wrapper(config_file)[source]

Obtain the configuration from a JSON file, validating against arl_schema

Parameters:config_file – Name of file containing JSON configuration
Returns:configuration
initialise_execution_wrapper(conf)[source]

Initialise the execution framework from JSON configuration

See arl_schema.json

Parameters:conf – JSON configuratiion
initialise_logging_wrapper(conf)[source]

Initialise logging from JSON configuration

See arl_schema.json

Parameters:conf – JSON configuratiion