# $Id: oi_fits.py,v 1.6 2004/03/01 10:35:33 jsy1001 Exp $ """Example python module for FITS optical interferometry exchange format. Implements April 2003 release of the Standard. This module is structured as a set of python classes, instances of which provide object-oriented access to data from an OI-FITS file (which may not exist on disk yet!). The classes are as follows: OI_FITS -- Instance stores data for entire OI-FITS file The data attributes of the OI_FITS instance are instances of other classes, or lists/dictionaries of these. Each of the these 'table classes' corresponds to one of the FITS binary tables defined in the Standard: Array -- OI_ARRAY table Target -- OI_TARGET table Wavelength -- OI_WAVELENGTH table Vis -- OI_VIS table Vis2 -- OI_VIS2 table T3 -- OI_T3 table The table classes typically have data attributes that are instances of lower-level classes, corresponding to rows of the FITS table. These 'row classes' generally don't have any method attributes. The entire FITS file corresponding to an OI_FITS instance may be read from/written to disk using the instance's FromHDUList() and ToHDUList() methods. These convert from/to a pyfits.HDUList instance (which represents data from a generic FITS file), as defined in the third-party pyfits module. pyfits provides many functions that can operate on HDUList instances (see the pyfits manual). A script to write an OI-FITS file would be structured as follows: import oi_fits oi = oi_fits.OI_FITS() # insert code to assign to data attributes oi.ToHDUList().writeto('output.fits') A script to read from an OI-FITS file would be structured like this: import oi_fits, pyfits hlist = pyfits.open('input.fits') oi = oi_fits.OI_FITS() oi.FromHDUList(hlist) hlist.close() # insert code to do something with oi's data attributes The _test function at the end of this file is a trivial example of reading and writing an OI-FITS file. Instances of table classes have FromTable() and ToTable() methods, which convert between the table class instance and a pyfits.BinTableHDU instance. These methods may be used when reading/writing individual tables, rather than the whole file. """ from numarray import * import pyfits, string # Exceptions raised by methods of these classes class OI_FITSError(Exception): """Base Exception class.""" pass class OI_FITSConformError(OI_FITSError): """Raised if pyfits.BinTableHDU does not conform to OI-FITS spec.""" pass class OI_FITSEmptyTableError(OI_FITSError): """Raised by ToTable() methods when table would have zero rows.""" class OI_FITS: """Data for entire OI_FITS file. Data attributes are: arrays -- Dictionary of Array instances, indexed by arrname target -- Target instance wavelengths -- Dictionary of Wavelength instances, indexed by insname vis -- List of Vis instances vis2 -- List of Vis2 instances t3 -- List of T3 instances Methods are: ToHDUList() -- return pyfits.HDUList instance FromHDUList(hlist) -- assign to data attributes from pyfits.HDUList instance verify() -- would ToHDUList().writeto() give a standard-conforming FITS file? """ def __init__(self): self.arrays = {} self.target = None self.wavelengths = {} self.vis = [] self.vis2 = [] self.t3 = [] def ToHDUList(self): """Return pyfits.HDUList instance.""" hlist = pyfits.HDUList() # Add primary HDU pri = pyfits.PrimaryHDU() hlist.append(pri) # Add binary table HDUs hlist.append(self.target.ToTable()) for i in range(len(self.wavelengths)): hlist.append(self.wavelengths.values()[i].ToTable(extver=i+1)) for i in range(len(self.arrays)): hlist.append(self.arrays.values()[i].ToTable(extver=i+1)) for i in range(len(self.vis)): hlist.append(self.vis[i].ToTable(extver=i+1)) for i in range(len(self.vis2)): hlist.append(self.vis2[i].ToTable(extver=i+1)) for i in range(len(self.t3)): hlist.append(self.t3[i].ToTable(extver=i+1)) return hlist def FromHDUList(self, hlist): """Assign to data attributes from pyfits.HDUList instance. May raise OI_FITSConformError. """ for hdu in hlist[1:]: extname = hdu.header['EXTNAME'] if extname == 'OI_WAVELENGTH': wavelength = Wavelength() wavelength.FromTable(hdu) if wavelength.insname in self.wavelengths.keys(): raise OI_FITSConformError, "wavelength table has non-unique insname" self.wavelengths[wavelength.insname] = wavelength elif extname == 'OI_ARRAY': array = Array() array.FromTable(hdu) if array.arrname in self.arrays.keys(): raise OI_FITSConformError, "array table has non-unique arrname" self.arrays[array.arrname] = array elif extname == 'OI_TARGET': self.target = Target() self.target.FromTable(hdu) elif extname == 'OI_VIS': vis = Vis() vis.FromTable(hdu) self.vis.append(vis) elif extname == 'OI_VIS2': vis2 = Vis2() vis2.FromTable(hdu) self.vis2.append(vis2) elif extname == 'OI_T3': t3 = T3() t3.FromTable(hdu) self.t3.append(t3) else: print 'Skipping non-standard %s table' % extname if not self.verify(): raise OI_FITSConformError, "Missing table or bad insname" def verify(self): """Would ToHDUList().writeto() give a standard-conforming FITS file? Checks tables present. """ data = self.vis+self.vis2+self.t3 if self.target is None or len(data) == 0: return 0 for obj in data: if not self.wavelengths.has_key(obj.insname): return 0 return 1 class Element: """Array element, corresponds to one row of an OI_ARRAY FITS table. Data attributes are: tel_name -- (string) Telescope name sta_name -- (string) Station name sta_index -- (int) Station number diameter -- (float) Element diameter (meters) staxyz -- (float 3-tuple) Station coords relative to array center (meters) """ pass class Array: """Data for OI_ARRAY FITS table. Data attributes are: revision -- (int) Revision number of the table definition arrname -- (string) Array name frame -- (string) Coordinate frame (always 'GEOCENTRIC') arrayx, arrayy, arrayz -- (float) Array center coordinates (meters) elements -- *Dictionary* of Element instances, indexed by sta_index Methods are: ToTable() -- Return pyfits.BinTableHDU instance FromTable() -- Copy to data attributes from pyfits.BinTableHDU instance """ def __init__(self): self.revision = 1 self.frame = 'GEOCENTRIC' self.elements = {} def ToTable(self, extver=1): """Return pyfits.BinTableHDU instance.""" if len(self.elements) == 0: raise OI_FITSEmptyTableError, "No elements in OI_ARRAY" cols = [] cols.append(pyfits.Column(name='TEL_NAME', format='16A')) cols.append(pyfits.Column(name='STA_NAME', format='16A')) cols.append(pyfits.Column(name='STA_INDEX', format='I')) cols.append(pyfits.Column(name='DIAMETER', format='E', unit='m')) cols.append(pyfits.Column(name='STAXYZ', format='3D', unit='m')) tabhdu = pyfits.new_table(cols, nrows=len(self.elements)) # Make list of Element instances, ordered by sta_index els = self.elements.values() els.sort(lambda a, b: cmp(a.sta_index, b.sta_index)) # Insert data irow = 0 for r in els: vals = [r.tel_name, r.sta_name, r.sta_index, r.diameter, array(r.staxyz)] for icol in range(len(cols)): tabhdu.data.field(icol)[irow] = vals[icol] irow += 1 tabhdu.name = 'OI_ARRAY' hdr = tabhdu.header hdr.update('EXTVER', extver, comment='ID number of this OI_ARRAY') hdr.update('OI_REVN', self.revision, comment='Revision number of the table definition') hdr.update('ARRNAME', self.arrname, comment='Array name') hdr.update('FRAME', self.frame, comment='Coordinate frame') hdr.update('ARRAYX', self.arrayx, comment='[m] Array center X coordinate') hdr.update('ARRAYY', self.arrayy, comment='[m] Array center Y coordinate') hdr.update('ARRAYZ', self.arrayz, comment='[m] Array center Z coordinate') return tabhdu def FromTable(self, tabhdu): """Copy to data attributes from pyfits.BinTableHDU instance. Values from non-standard columns get assigned to extra data attributes. """ try: self.revision = tabhdu.header['OI_REVN'] for keyword in ['ARRNAME', 'FRAME', 'ARRAYX', 'ARRAYY', 'ARRAYZ']: self.__dict__[string.lower(keyword)] = tabhdu.header[keyword] except KeyError: raise OI_FITSConformError, "Missing keyword" cols = tabhdu.get_coldefs() self.elements = {} for irow in range(len(tabhdu.data)): el = Element() for name in cols.names: el.__dict__[string.lower(name)] = tabhdu.data[irow].field(name) try: el.staxyz = tuple(el.staxyz) if self.elements.has_key(el.sta_index): raise OI_FITSConformError, "Duplicate sta_index" except AttributeError: raise OI_FITSConformError, "Missing column" self.elements[el.sta_index] = el class TargetRecord: """Info on an observing target. Corresponds to one row of an OI_TARGET FITS table. Data attributes are: target_id -- (int) Index number target -- (string) Target name raep0 -- (float) RA at mean equinox (degrees) decep0 -- (float) DEC at mean equinox (degrees) equinox -- (float) Equinox ra_err -- (float) Error in RA at mean equinox (degrees) dec_err -- (float) Error in RA at mean equinox (degrees) sysvel -- (float) Systemic radial velocity (meters per second) veltyp -- (string) Reference for radial velocity veldef -- (string) Definition of radial velocity pmra -- (float) Proper motion in RA (degrees per year) pmdec -- (float) Proper motion in DEC (degrees per year) pmra_err -- (float) Error of proper motion in RA (degrees per year) pmdec_err -- (float) Error of proper motion in DEC (degrees per year) parallax -- (float) Parallax (degrees) para_err -- (float) Error in parallax (degrees) spectyp -- (string) Spectral type """ pass class Target: """Data for OI_TARGET FITS table. Data attributes are: revision -- (int) Revision number of the table definition records -- *Dictionary* of TargetRecord instances, indexed by target_id Methods are: ToTable() -- Return pyfits.BinTableHDU instance FromTable() -- Copy to data attributes from pyfits.BinTableHDU instance """ def __init__(self): self.revision = 1 self.records = {} def ToTable(self): """Return pyfits.BinTableHDU instance.""" if len(self.records) == 0: raise OI_FITSEmptyTableError, "No records in OI_TARGET" cols = [] cols.append(pyfits.Column(name='TARGET_ID', format='I')) cols.append(pyfits.Column(name='TARGET', format='16A')) cols.append(pyfits.Column(name='RAEP0', format='D', unit='deg')) cols.append(pyfits.Column(name='DECEP0', format='D', unit='deg')) cols.append(pyfits.Column(name='EQUINOX', format='E', unit='yr')) cols.append(pyfits.Column(name='RA_ERR', format='D', unit='deg')) cols.append(pyfits.Column(name='DEC_ERR', format='D', unit='deg')) cols.append(pyfits.Column(name='SYSVEL', format='D', unit='m/s')) cols.append(pyfits.Column(name='VELTYP', format='8A')) cols.append(pyfits.Column(name='VELDEF', format='8A')) for name in ['PMRA', 'PMDEC', 'PMRA_ERR', 'PMDEC_ERR']: cols.append(pyfits.Column(name=name, format='D', unit='deg/yr')) cols.append(pyfits.Column(name='PARALLAX', format='E', unit='deg')) cols.append(pyfits.Column(name='PARA_ERR', format='E', unit='deg')) cols.append(pyfits.Column(name='SPECTYP', format='16A')) tabhdu = pyfits.new_table(cols, nrows=len(self.records)) # Make list of TargetRecord instances, ordered by target_id recs = self.records.values() recs.sort(lambda a, b: cmp(a.target_id, b.target_id)) # Insert data irow = 0 for r in recs: vals = [r.target_id, r.target, r.raep0, r.decep0, r.equinox, r.ra_err, r.dec_err, r.sysvel, r.veltyp, r.veldef, r.pmra, r.pmdec, r.pmra_err, r.pmdec_err, r.parallax, r.para_err, r.spectyp] for icol in range(len(cols)): tabhdu.data.field(icol)[irow] = vals[icol] irow += 1 tabhdu.name = 'OI_TARGET' hdr = tabhdu.header hdr.update('EXTVER', 1) hdr.update('OI_REVN', self.revision, comment='Revision number of the table definition') return tabhdu def FromTable(self, tabhdu): """Copy to data attributes from pyfits.BinTableHDU instance. Values from non-standard columns get assigned to extra data attributes. """ try: self.revision = tabhdu.header['OI_REVN'] except KeyError: raise OI_FITSConformError, "Missing keyword OI_REVN" cols = tabhdu.get_coldefs() self.records = {} for irow in range(len(tabhdu.data)): rec = TargetRecord() for name in cols.names: rec.__dict__[string.lower(name)] = tabhdu.data[irow].field(name) try: if self.records.has_key(rec.target_id): raise OI_FITSConformError, "Duplicate target_id" except AttributeError: raise OI_FITSConformError, "Missing column" self.records[rec.target_id] = rec class Wavelength: """Data for OI_WAVELENGTH FITS table. Data attributes are: revision -- (int) Revision number of the table definition insname -- (string) Name of detector eff_wave -- (float list) Effective wavelength of each channel (meters) eff_band -- (float list) Effective bandpass of each channel (meters) Methods are: ToTable() -- Return pyfits.BinTableHDU instance FromTable() -- Copy to data attributes from pyfits.BinTableHDU instance """ def __init__(self): self.revision = 1 self.eff_wave = [] self.eff_band = [] def ToTable(self, extver=1): """Return pyfits.BinTableHDU instance.""" if len(self.eff_wave) == 0 or len(self.eff_band) == 0: raise OI_FITSEmptyTableError, "No channels in OI_WAVELENGTH" col1 = pyfits.Column(name='EFF_WAVE', format='E', unit='m', array=array(self.eff_wave)) col2 = pyfits.Column(name='EFF_BAND', format='E', unit='m', array=array(self.eff_band)) tabhdu = pyfits.new_table([col1, col2]) tabhdu.name = 'OI_WAVELENGTH' hdr = tabhdu.header hdr.update('EXTVER', extver, comment='ID number of this OI_WAVELENGTH') hdr.update('OI_REVN', self.revision, comment='Revision number of the table definition') hdr.update('INSNAME', self.insname, comment='Name of detector') return tabhdu def FromTable(self, tabhdu): """Copy to data attributes from pyfits.BinTableHDU instance.""" try: self.revision = tabhdu.header['OI_REVN'] self.insname = tabhdu.header['INSNAME'] except KeyError: raise OI_FITSConformError, "Missing keyword" try: self.eff_wave = tabhdu.data.field('EFF_WAVE').tolist() self.eff_band = tabhdu.data.field('EFF_BAND').tolist() except NameError: raise OI_FITSConformError, "Missing column" class VisRecord: """Complex visibility record. Corresponds to one row of an OI_VIS FITS table. Data attributes are: target_id -- (int) Target number as index into Target.records time - (float) UTC time of observation (seconds) mjd -- (float) Modified Julian Day int_time -- (float) Integration time (seconds) visamp -- (float list, length nwave) Visibility amplitude visamperr -- (float list, length nwave) Error in visibility amplitude visphi -- (float list, length nwave) Visibility phase (degrees) visphierr -- (float list, length nwave) Error in visibility phase (degrees) ucoord -- (float) U coordinate of the data (meters) vcoord -- (float) V coordinate of the data (meters) sta_index -- (int 2-tuple) Station numbers contributing to the data flag -- (boolean list, length nwave) Flag """ pass class Vis: """Data for OI_VIS FITS table. Data attributes are: revision -- (int) revision number of the table definition date_obs -- (int 3-tuple: y, m, d) UTC start date of observations (or may be None) arrname -- (string) Identifies corresponding Array instance (may be None) insname -- (string) Identifies corresponding Wavelength instance records -- list of VisRecord instances nwave -- length of visamp, visamperr, visphi, visphierr, flag attributes of records Methods are: ToTable() -- Return pyfits.BinTableHDU instance FromTable() -- Copy to data attributes from pyfits.BinTableHDU instance """ def __init__(self): self.revision = 1 self.arrname = None self.nwave = 0 self.records = [] def ToTable(self, extver=1): """Return pyfits.BinTableHDU instance.""" if len(self.records) == 0: raise OI_FITSEmptyTableError, "No records in OI_VIS" # Define table structure cols = [] cols.append(pyfits.Column(name='TARGET_ID', format='I')) cols.append(pyfits.Column(name='TIME', format='D', unit='s')) cols.append(pyfits.Column(name='MJD', format='D', unit='day')) cols.append(pyfits.Column(name='INT_TIME', format='D', unit='s')) cols.append(pyfits.Column(name='VISAMP', format='%dD' % self.nwave)) cols.append(pyfits.Column(name='VISAMPERR', format='%dD' % self.nwave)) cols.append(pyfits.Column(name='VISPHI', format='%dD' % self.nwave, unit='deg')) cols.append(pyfits.Column(name='VISPHIERR', format='%dD' % self.nwave, unit='deg')) cols.append(pyfits.Column(name='UCOORD', format='D', unit='m')) cols.append(pyfits.Column(name='VCOORD', format='D', unit='m')) cols.append(pyfits.Column(name='STA_INDEX', format='2I')) cols.append(pyfits.Column(name='FLAG', format='%dL' % self.nwave)) tabhdu = pyfits.new_table(cols, nrows=len(self.records)) # Insert data irow = 0 for r in self.records: if self.nwave == 1: vals = [r.target_id, r.time, r.mjd, r.int_time, r.visamp[0], r.visamperr[0], r.visphi[0], r.visphierr[0], r.ucoord, r.vcoord, array(r.sta_index, type=Int16), r.flag[0]] else: vals = [r.target_id, r.time, r.mjd, r.int_time, array(r.visamp), array(r.visamperr), array(r.visphi), array(r.visphierr), r.ucoord, r.vcoord, array(r.sta_index, type=Int16), array(r.flag, type=Bool)] for icol in range(len(cols)): tabhdu.data.field(icol)[irow] = vals[icol] irow += 1 # Workaround for probable pyfits bug true = array([ord('T')]*self.nwave, type=UInt8) false = array([ord('F')]*self.nwave, type=UInt8) tabhdu.data._parent.field('FLAG')[:] = where(tabhdu.data.field('FLAG'), true, false) # Insert header records tabhdu.name = 'OI_VIS' hdr = tabhdu.header hdr.update('EXTVER', extver, comment='ID number of this OI_VIS') hdr.update('OI_REVN', self.revision, comment='Revision number of the table definition') hdr.update('DATE-OBS', '%04d-%02d-%02d' % self.date_obs, comment='UTC Start date of observations') if self.arrname is not None: hdr.update('ARRNAME', self.arrname, comment='Identifies corresponding OI_ARRAY') hdr.update('INSNAME', self.insname, comment='Identifies corresponding OI_WAVELENGTH') return tabhdu def FromTable(self, tabhdu): """Copy to data attributes from pyfits.BinTableHDU instance.""" try: self.revision = tabhdu.header['OI_REVN'] self.date_obs = tuple( map(int, string.split(tabhdu.header['DATE-OBS'], '-'))) # ARRNAME is optional try: self.arrname = tabhdu.header['ARRNAME'] except KeyError: self.arrname = None self.insname = tabhdu.header['INSNAME'] except KeyError: raise OI_FITSConformError, "Missing keyword" cols = tabhdu.get_coldefs() try: i = cols.names.index('VISAMP') except ValueError: raise OI_FITSConformError, "Missing column VISAMP" end = 0 while cols.formats[i][end] in string.digits: end += 1 if end != 0: self.nwave = int(cols.formats[i][:end]) else: self.nwave = 1 self.records = [] for irow in range(len(tabhdu.data)): row = tabhdu.data[irow] rec = VisRecord() try: rec.target_id = row.field('TARGET_ID') rec.time = row.field('TIME') rec.mjd = row.field('MJD') rec.int_time = row.field('INT_TIME') if self.nwave > 1: rec.visamp = row.field('VISAMP').tolist() rec.visamperr = row.field('VISAMPERR').tolist() rec.visphi = row.field('VISPHI').tolist() rec.visphierr = row.field('VISPHIERR').tolist() rec.flag = row.field('FLAG').tolist() else: rec.visamp = [row.field('VISAMP')] rec.visamperr = [row.field('VISAMPERR')] rec.visphi = [row.field('VISPHI')] rec.visphierr = [row.field('VISPHIERR')] rec.flag = [row.field('FLAG')] rec.ucoord = row.field('UCOORD') rec.vcoord = row.field('VCOORD') rec.sta_index = tuple(row.field('STA_INDEX').tolist()) except NameError: raise OI_FITSConformError, "Missing column" self.records.append(rec) class Vis2Record: """Visibility squared record. Corresponds to one row of an OI_VIS2 FITS table. Data attributes are: target_id -- (int) Target number as index into Target.records time - (float) UTC time of observation (seconds) mjd -- (float) Modified Julian Day int_time -- (float) Integration time (seconds) vis2data -- (float list, length nwave) Squared visibility vis2err -- (float list, length nwave) Error in squared visibility ucoord -- (float) U coordinate of the data (meters) vcoord -- (float) V coordinate of the data (meters) sta_index -- (int 2-tuple) Station numbers contributing to the data flag -- (boolean list, length nwave) Flag """ pass class Vis2: """Data for OI_VIS2 FITS table. Data attributes are: revision -- (int) revision number of the table definition date_obs -- (int 3-tuple: y, m, d) UTC start date of observations (or may be None) arrname -- (string) Identifies corresponding Array instance (may be None) insname -- (string) Identifies corresponding Wavelength instance records -- list of Vis2Record instances nwave -- length of vis2data, vis2err, flag attributes of records Methods are: ToTable() -- Return pyfits.BinTableHDU instance FromTable() -- Copy to data attributes from pyfits.BinTableHDU instance """ def __init__(self): self.revision = 1 self.arrname = None self.nwave = 0 self.records = [] def ToTable(self, extver=1): """Return pyfits.BinTableHDU instance.""" if len(self.records) == 0: raise OI_FITSEmptyTableError, "No records in OI_VIS2" # Define table structure cols = [] cols.append(pyfits.Column(name='TARGET_ID', format='I')) cols.append(pyfits.Column(name='TIME', format='D', unit='s')) cols.append(pyfits.Column(name='MJD', format='D', unit='day')) cols.append(pyfits.Column(name='INT_TIME', format='D', unit='s')) cols.append(pyfits.Column(name='VIS2DATA', format='%dD' % self.nwave)) cols.append(pyfits.Column(name='VIS2ERR', format='%dD' % self.nwave)) cols.append(pyfits.Column(name='UCOORD', format='D', unit='m')) cols.append(pyfits.Column(name='VCOORD', format='D', unit='m')) cols.append(pyfits.Column(name='STA_INDEX', format='2I')) cols.append(pyfits.Column(name='FLAG', format='%dL' % self.nwave)) tabhdu = pyfits.new_table(cols, nrows=len(self.records)) # Insert data irow = 0 for r in self.records: if self.nwave == 1: vals = [r.target_id, r.time, r.mjd, r.int_time, r.vis2data[0], r.vis2err[0], r.ucoord, r.vcoord, array(r.sta_index, type=Int16), r.flag[0]] else: vals = [r.target_id, r.time, r.mjd, r.int_time, array(r.vis2data), array(r.vis2err), r.ucoord, r.vcoord, array(r.sta_index, type=Int16), array(r.flag, type=Bool)] for icol in range(len(cols)): tabhdu.data.field(icol)[irow] = vals[icol] irow += 1 # Workaround for probable pyfits bug true = array([ord('T')]*self.nwave, type=UInt8) false = array([ord('F')]*self.nwave, type=UInt8) tabhdu.data._parent.field('FLAG')[:] = where(tabhdu.data.field('FLAG'), true, false) # Insert header records tabhdu.name = 'OI_VIS2' hdr = tabhdu.header hdr.update('EXTVER', extver, comment='ID number of this OI_VIS2') hdr.update('OI_REVN', self.revision, comment='Revision number of the table definition') if self.date_obs is not None: hdr.update('DATE-OBS', '%04d-%02d-%02d' % self.date_obs, comment='UTC Start date of observations') if self.arrname is not None: hdr.update('ARRNAME', self.arrname, comment='Identifies corresponding OI_ARRAY') hdr.update('INSNAME', self.insname, comment='Identifies corresponding OI_WAVELENGTH') return tabhdu def FromTable(self, tabhdu): """Copy to data attributes from pyfits.BinTableHDU instance.""" try: self.revision = tabhdu.header['OI_REVN'] self.date_obs = tuple( map(int, string.split(tabhdu.header['DATE-OBS'], '-'))) # ARRNAME is optional try: self.arrname = tabhdu.header['ARRNAME'] except KeyError: self.arrname = None self.insname = tabhdu.header['INSNAME'] except KeyError: raise OI_FITSConformError, "Missing keyword" cols = tabhdu.get_coldefs() try: i = cols.names.index('VIS2DATA') except ValueError: raise OI_FITSConformError, "Missing column" end = 0 while cols.formats[i][end] in string.digits: end += 1 if end != 0: self.nwave = int(cols.formats[i][:end]) else: self.nwave = 1 self.records = [] for irow in range(len(tabhdu.data)): row = tabhdu.data[irow] rec = Vis2Record() try: rec.target_id = row.field('TARGET_ID') rec.time = row.field('TIME') rec.mjd = row.field('MJD') rec.int_time = row.field('INT_TIME') if self.nwave > 1: rec.vis2data = row.field('VIS2DATA').tolist() rec.vis2err = row.field('VIS2ERR').tolist() rec.flag = row.field('FLAG').tolist() else: rec.vis2data = [row.field('VIS2DATA')] rec.vis2err = [row.field('VIS2ERR')] rec.flag = [row.field('FLAG')] rec.ucoord = row.field('UCOORD') rec.vcoord = row.field('VCOORD') rec.sta_index = tuple(row.field('STA_INDEX').tolist()) except NameError: raise OI_FITSConformError, "Missing column" self.records.append(rec) class T3Record: """Triple product record. Corresponds to one row of an OI_T3 FITS table. Data attributes are: target_id -- (int) Target number as index into Target.records time - (float) UTC time of observation (seconds) mjd -- (float) Modified Julian Day int_time -- (float) Integration time (seconds) t3amp -- (float list, length nwave) Visibility amplitude t3amperr -- (float list, length nwave) Error in visibility amplitude t3phi -- (float list, length nwave) Visibility phase (degrees) t3phierr -- (float list, length nwave) Error in visibility phase (degrees) u1coord -- (float) U coordinate of baseline AB of the triangle (meters) v1coord -- (float) V coordinate of baseline AB of the triangle (meters) u2coord -- (float) U coordinate of baseline BC of the triangle (meters) v2coord -- (float) V coordinate of baseline BC of the triangle (meters) sta_index -- (int 3-tuple) Station numbers contributing to the data flag -- (boolean list, length nwave) Flag """ pass class T3: """Data for OI_T3 FITS table. Data attributes are: revision -- (int) revision number of the table definition date_obs -- (int 3-tuple: y, m, d) UTC start date of observations (or may be None) arrname -- (string) Identifies corresponding Array instance (may be None) insname -- (string) Identifies corresponding Wavelength instance records -- list of T3Record instances nwave -- length of t3amp, t3amperr, t3phi, t3phierr, flag attributes of records Methods are: ToTable() -- Return pyfits.BinTableHDU instance FromTable() -- Copy to data attributes from pyfits.BinTableHDU instance """ def __init__(self): self.revision = 1 self.arrname = None self.nwave = 0 self.records = [] def ToTable(self, extver=1): """Return pyfits.BinTableHDU instance.""" if len(self.records) == 0: raise OI_FITSEmptyTableError, "No records in OI_T3" # Define table structure cols = [] cols.append(pyfits.Column(name='TARGET_ID', format='I')) cols.append(pyfits.Column(name='TIME', format='D', unit='s')) cols.append(pyfits.Column(name='MJD', format='D', unit='day')) cols.append(pyfits.Column(name='INT_TIME', format='D', unit='s')) cols.append(pyfits.Column(name='T3AMP', format='%dD' % self.nwave)) cols.append(pyfits.Column(name='T3AMPERR', format='%dD' % self.nwave)) cols.append(pyfits.Column(name='T3PHI', format='%dD' % self.nwave, unit='deg')) cols.append(pyfits.Column(name='T3PHIERR', format='%dD' % self.nwave, unit='deg')) cols.append(pyfits.Column(name='U1COORD', format='D', unit='m')) cols.append(pyfits.Column(name='V1COORD', format='D', unit='m')) cols.append(pyfits.Column(name='U2COORD', format='D', unit='m')) cols.append(pyfits.Column(name='V2COORD', format='D', unit='m')) cols.append(pyfits.Column(name='STA_INDEX', format='3I')) cols.append(pyfits.Column(name='FLAG', format='%dL' % self.nwave)) tabhdu = pyfits.new_table(cols, nrows=len(self.records)) # Insert data irow = 0 for r in self.records: if self.nwave == 1: vals = [r.target_id, r.time, r.mjd, r.int_time, r.t3amp[0], r.t3amperr[0], r.t3phi[0], r.t3phierr[0], r.u1coord, r.v1coord, r.u2coord, r.v2coord, array(r.sta_index, type=Int16), r.flag[0]] else: vals = [r.target_id, r.time, r.mjd, r.int_time, array(r.t3amp), array(r.t3amperr), array(r.t3phi), array(r.t3phierr), r.u1coord, r.v1coord, r.u2coord, r.v2coord, array(r.sta_index, type=Int16), array(r.flag, type=Bool)] for icol in range(len(cols)): tabhdu.data.field(icol)[irow] = vals[icol] irow += 1 # Workaround for probable pyfits bug true = array([ord('T')]*self.nwave, type=UInt8) false = array([ord('F')]*self.nwave, type=UInt8) tabhdu.data._parent.field('FLAG')[:] = where(tabhdu.data.field('FLAG'), true, false) # Insert header records tabhdu.name = 'OI_T3' hdr = tabhdu.header hdr.update('EXTVER', extver, comment='ID number of this OI_T3') hdr.update('OI_REVN', self.revision, comment='Revision number of the table definition') if self.date_obs is not None: hdr.update('DATE-OBS', '%04d-%02d-%02d' % self.date_obs, comment='UTC Start date of observations') if self.arrname is not None: hdr.update('ARRNAME', self.arrname, comment='Identifies corresponding OI_ARRAY') hdr.update('INSNAME', self.insname, comment='Identifies corresponding OI_WAVELENGTH') return tabhdu def FromTable(self, tabhdu): """Copy to data attributes from pyfits.BinTableHDU instance.""" try: self.revision = tabhdu.header['OI_REVN'] self.date_obs = tuple( map(int, string.split(tabhdu.header['DATE-OBS'], '-'))) # ARRNAME is optional try: self.arrname = tabhdu.header['ARRNAME'] except KeyError: self.arrname = None self.insname = tabhdu.header['INSNAME'] except KeyError: raise OI_FITSConformError, "Missing keyword" cols = tabhdu.get_coldefs() try: i = cols.names.index('T3AMP') except ValueError: raise OI_FITSConformError, "Missing column" end = 0 while cols.formats[i][end] in string.digits: end += 1 if end != 0: self.nwave = int(cols.formats[i][:end]) else: self.nwave = 1 self.records = [] for irow in range(len(tabhdu.data)): row = tabhdu.data[irow] rec = Vis2Record() try: rec.target_id = row.field('TARGET_ID') rec.time = row.field('TIME') rec.mjd = row.field('MJD') rec.int_time = row.field('INT_TIME') if self.nwave > 1: rec.t3amp = row.field('T3AMP').tolist() rec.t3amperr = row.field('T3AMPERR').tolist() rec.t3phi = row.field('T3PHI').tolist() rec.t3phierr = row.field('T3PHIERR').tolist() rec.flag = row.field('FLAG').tolist() else: rec.t3amp = [row.field('T3AMP')] rec.t3amperr = [row.field('T3AMPERR')] rec.t3phi = [row.field('T3PHI')] rec.t3phierr = [row.field('T3PHIERR')] rec.flag = [row.field('FLAG')] rec.u1coord = row.field('U1COORD') rec.v1coord = row.field('V1COORD') rec.u2coord = row.field('U2COORD') rec.v2coord = row.field('V2COORD') rec.sta_index = tuple(row.field('STA_INDEX').tolist()) except NameError: raise OI_FITSConformError, "Missing column" self.records.append(rec) def _test(inputName='bigtest.fits', outputName='junk.fits'): """Test code. Reads an existing OI-FITS file, writes it out again, then reads the copy. """ # Read file hlist = pyfits.open(inputName) oi = OI_FITS() oi.FromHDUList(hlist) hlist.close() # Check tables present oi.verify() # Write file oi.ToHDUList().writeto(outputName) # Read last output hlist = pyfits.open(outputName) oi2 = OI_FITS() oi2.FromHDUList(hlist) hlist.close() if __name__ == '__main__': _test()