Workflows¶
Execution¶
Execution (optionally via Dask)¶
Execute wrap dask such that with the same code Dask.delayed can be replaced by immediate calculation
Dask init¶
Initialise dask
-
get_dask_Client
(timeout=30, n_workers=None, threads_per_worker=1, processes=True, create_cluster=True, memory_limit=None)[source]¶ Get a Dask.distributed Client for the scheduler defined externally, otherwise create
The environment variable ARL_DASK_SCHEDULER is interpreted as pointing to the scheduler. and a client using that scheduler is returned. Otherwise a client is created
Parameters: - timeout – Time out for creation
- n_workers – Number of workers
- threads_per_worker –
- processes – Use processes instead of threads
- create_cluster – Create a LocalCluster
- memory_limit – Memory limit per worker (bytes e.g. 8e9)
Returns: Dask client
Pipelines¶
Pipelines using arlexecute¶
SDP pipelines as processing components.
-
continuum_imaging_workflow
(vis_list, model_imagelist, context='2d', **kwargs)[source]¶ Create graph for the continuum imaging pipeline.
Same as ICAL but with no selfcal.
Parameters: - vis_list –
- model_imagelist –
- context – Imaging context
- kwargs – Parameters for functions in components
Returns:
-
ical_workflow
(vis_list, model_imagelist, context='2d', calibration_context='TG', do_selfcal=True, **kwargs)[source]¶ Create graph for ICAL pipeline
Parameters: - vis_list –
- model_imagelist –
- context – imaging context e.g. ‘2d’
- calibration_context – Sequence of calibration steps e.g. TGB
- do_selfcal – Do the selfcalibration?
- kwargs – Parameters for functions in components
Returns:
-
spectral_line_imaging_workflow
(vis_list, model_imagelist, continuum_model_imagelist=None, context='2d', **kwargs)[source]¶ Create graph for spectral line imaging pipeline
Uses the ical pipeline after subtraction of a continuum model
Parameters: - vis_list – List of visibility components
- model_imagelist – Spectral line model graph
- continuum_model_imagelist – Continuum model list
- context – Imaging context
- kwargs – Parameters for functions in components
Returns: (deconvolved model, residual, restored)
Support¶
Pipelines expressed as dask components
-
corrupt_workflow
(vis_list, gt_list=None, **kwargs)[source]¶ Create a graph to apply gain errors to a vis_list
Parameters: - vis_list –
- gt_list – Optional gain table graph
- kwargs –
Returns:
-
simulate_workflow
(config='LOWBD2', phasecentre=<SkyCoord (ICRS): (ra, dec) in deg (15., -60.)>, frequency=None, channel_bandwidth=None, times=None, polarisation_frame=<data_models.polarisation.PolarisationFrame object>, order='frequency', format='blockvis', rmax=1000.0, zerow=False)[source]¶ A component to simulate an observation
The simulation step can generate a single BlockVisibility or a list of BlockVisibility’s. The parameter keyword determines the way that the list is constructed. If order=’frequency’ then len(frequency) BlockVisibility’s with all times are created. If order=’time’ then len(times) BlockVisibility’s with all frequencies are created. If order = ‘both’ then len(times) * len(times) BlockVisibility’s are created each with a single time and frequency. If order = None then all data are created in one BlockVisibility.
The output format can be either ‘blockvis’ (for calibration) or ‘vis’ (for imaging)
Parameters: - config – Name of configuration: def LOWBDS-CORE
- phasecentre – Phase centre def: SkyCoord(ra=+15.0 * u.deg, dec=-60.0 * u.deg, frame=’icrs’, equinox=’J2000’)
- frequency – def [1e8]
- channel_bandwidth – def [1e6]
- times – Observing times in radians: def [0.0]
- polarisation_frame – def PolarisationFrame(“stokesI”)
- order – ‘time’ or ‘frequency’ or ‘both’ or None: def ‘frequency’
- format – ‘blockvis’ or ‘vis’: def ‘blockvis’
Returns: vis_list with different frequencies in different elements
Processing Component Interface¶
Wrappers¶
The python script processing_component_interface enables enable running ARL components from another environment such as bash.
A bash script example for a continuum imaging pipeline is:
#!/usr/bin/env bash
cd ${ARL}/workflows/wrapped
comp_location=${ARL}/processing_components/processing_component_interface
python ${comp_location}/processing_component_interface.py --config gleam_create_vislist.json
python ${comp_location}/processing_component_interface.py --config gleam_create_skymodel.json
python ${comp_location}/processing_component_interface.py --config gleam_predict_vislist.json
python ${comp_location}/processing_component_interface.py --config gleam_continuum_imaging.json
To be available in this way, a component must be wrapped appropriately and placed in processing_component_wrappers.py. For example, here is a simple wrapper:
def corrupt_vislist_wrapper(conf):
vis_list = buffer_data_model_to_memory(conf["buffer"], conf['inputs']['vis_list'])
phase_error = json_to_quantity(conf['corrupt_vislist']['phase_error']).to('rad').value
corrupted_vislist = corrupt_workflow(vis_list,
phase_error=phase_error,
amplitude_error=conf['corrupt_vislist']['amplitude_error'])
return arlexecute.execute(memory_data_model_to_buffer)(corrupted_vislist, conf["buffer"], conf['outputs']['vis_list'])
the wrapper for predict is:
def predict_vislist_wrapper(conf):
vis_list = buffer_data_model_to_memory(conf["buffer"], conf['inputs']['vis_list'])
skymodel = buffer_data_model_to_memory(conf["buffer"], conf['inputs']['skymodel'])
flux_limit = conf['primary_beam']['flux_limit']
if conf["primary_beam"]["apply"]:
def apply_pb_image(vt, model):
telescope = vt.configuration.name
pb = create_pb(model, telescope)
model.data *= pb.data
return model
def apply_pb_comp(vt, model, comp):
telescope = vt.configuration.name
pb = create_pb(model, telescope)
return apply_beam_to_skycomponent(comp, pb, flux_limit)
image_list = [arlexecute.execute(apply_pb_image, nout=1)(v, skymodel.images[i]) for i, v in enumerate(vis_list)]
if len(skymodel.components) > 1:
component_list = [arlexecute.execute(apply_pb_comp, nout=1)(v, skymodel.images[i], skymodel.components)
for i, v in enumerate(vis_list)]
else:
component_list = []
else:
image_list = [skymodel.images[0] for v in vis_list]
component_list = skymodel.components
future_vis_list = arlexecute.scatter(vis_list)
predicted_vis_list = [arlexecute.execute(predict_skycomponent_visibility)(v, component_list)
for v in future_vis_list]
predicted_vis_list = predict_workflow(predicted_vis_list, image_list,
context=conf['imaging']['context'],
vis_slices=conf['imaging']['vis_slices'])
return arlexecute.execute(memory_data_model_to_buffer)(predicted_vis_list, conf["buffer"],
conf["outputs"]["vis_list"])
The JSON files contain the name of the component to be run and all the parameters necessary. An example of a JSON file is:
{
"execute": {
"use_dask": true,
"n_workers": 4,
"memory_limit": 4000000000
},
"component": {
"framework": "ARL",
"name": "predict_vislist"
},
"logging": {
"filename": "test_pipeline.log",
"filemode": "a",
"format": "%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s",
"datefmt": "%H:%M:%S",
"level": "INFO"
},
"buffer": {
"directory": "test_results/"
},
"inputs": {
"skymodel": {
"name":"test_skymodel.hdf",
"data_model": "SkyModel"
},
"vis_list": {
"name": "test_empty_vislist.hdf",
"data_model": "BlockVisibility"
}
},
"outputs": {
"vis_list": {
"name": "test_perfect_vislist.hdf",
"data_model": "BlockVisibility"
}
},
"imaging": {
"context": "wstack",
"vis_slices": 11
},
"primary_beam": {
"apply": true,
"flux_limit" : {"value": 0.01, "unit":"Jy"}
}
}
The parameters for the component are passed via a JSON file, either via python:
component_wrapper("gleam_continuum_imaging.json")
or the dict derived from JSON may be passed directly:
config = initialise_config_wrapper("gleam_continuum_imaging.json")
component_wrapper(config)
or from bash:
python component_wrapper -- config "gleam_continuum_imaging.json"
Examples of json files are in tests/workflows/.
ARL JSON schema¶
Definition of ARL JSON schema and helper functions
The ARL JSON schema contains various definitions. Some are for logging and execution, some are for standard uses ( such as image and imaging), some are helper definitions (like linspace), and some are directed for specific processing_component_interface.
Compliance with the schema is checked on loading. Definitions not present in the schema are allowed. Finally no default handling is currently present so all fields must be defined.
Helper functions for converting JSON to ARL objects
-
json_to_linspace
(l)[source]¶ Convert JSON string to numpy.linspace
e.g. “frequency”: {“start”: 0.9e8,”stop”: 1.1e8,”steps”: 7}
Parameters: l – Returns:
Component wrapper¶
This is the processing wrapper interface into the ARL, allowing accessing to wrapped components.
Processing component wrapper¶
Wrappers around ARL processing components, using JSON for configuration.
These can be executed using processing_workflow_interface.py.
-
create_skymodel_wrapper
(conf)[source]¶ Wrapper to create skymodel
Parameters: conf – Configuration from JSON file Returns:
-
create_vislist_wrapper
(conf)[source]¶ Create an empty vislist
Parameters: conf – Configuration from JSON file Returns:
Execution helpers¶
Wrappers needed for execution
-
initialise_config_wrapper
(config_file)[source]¶ Obtain the configuration from a JSON file, validating against arl_schema
Parameters: config_file – Name of file containing JSON configuration Returns: configuration