from xradio._utils.zarr.common import _get_file_system_and_items
import xarray as xr
[docs]
def open_processing_set(
ps_store: str,
scan_intents: list | None = None,
array_backend: str = "dask",
) -> xr.DataTree:
"""Creates a lazy representation of a Processing Set (only meta-data is loaded into memory).
Parameters
----------
ps_store : str
String of the path and name of the processing set. For example '/users/user_1/uid___A002_Xf07bba_Xbe5c_target.lsrk.vis.zarr'.
scan_intents : str | None, optional
A list of scan_intents to be opened for example ['OBSERVE_TARGET#ON_SOURCE']. The scan_intents in a processing_set_xdt can be seen by calling processing_set_xdt.ps.summary().
By default None, which will include all scan_intents.
array_backend : str, optional
The array backend to use for the data variables in the processing set. Options are 'dask' and 'xarray' both which are lazy. By default 'dask', which will create Dask arrays for the data variables.
Returns
-------
xarray.DataTree
Lazy representation of processing set (the data arrays of the datasets are
represented by Dask.arrays).
"""
file_system, ms_store_list = _get_file_system_and_items(ps_store)
import s3fs
if array_backend == "xarray":
chunks = None
chunked_array_type = None
elif array_backend == "dask":
chunks = {}
chunked_array_type = "dask"
else:
raise ValueError("array_backend must be either 'dask' or 'xarray'")
if isinstance(file_system, s3fs.core.S3FileSystem):
mapping = s3fs.S3Map(root=ps_store, s3=file_system, check=False)
ps_xdt = xr.open_datatree(
mapping, engine="zarr", chunks=chunks, chunked_array_type=chunked_array_type
)
else:
ps_xdt = xr.open_datatree(
ps_store,
engine="zarr",
chunks=chunks,
chunked_array_type=chunked_array_type,
)
# Future work is to add ASDM backend
if scan_intents is None:
return ps_xdt
else:
return ps_xdt.xr_ps.query(scan_intents=scan_intents)
# def open_processing_set(
# ps_store: str,
# intents: list = None,
# ): #-> ProcessingSet:
# """Creates a lazy representation of a Processing Set (only meta-data is loaded into memory).
# Parameters
# ----------
# ps_store : str
# String of the path and name of the processing set. For example '/users/user_1/uid___A002_Xf07bba_Xbe5c_target.lsrk.vis.zarr'.
# intents : list, optional
# A list of intents to be open for example ['OBSERVE_TARGET#ON_SOURCE']. The intents in a processing set can be seen by calling processing_set.summary().
# By default None, which will open all intents.
# Returns
# -------
# processing_set
# Lazy representation of processing set (data is represented by Dask.arrays).
# """
# from xradio.measurement_set import MeasurementSetXds
# file_system, ms_store_list = _get_file_system_and_items(ps_store)
# from xradio.measurement_set import ProcessingSet
# ps = ProcessingSet()
# data_group = "base"
# for ms_name in ms_store_list:
# # try:
# ms_store = os.path.join(ps_store, ms_name)
# correlated_store = os.path.join(ms_store, "correlated_xds")
# xds = _open_dataset(correlated_store, file_system)
# data_groups = xds.attrs["data_groups"]
# if (intents is None) or (
# bool(set(xds.attrs["partition_info"]["intents"]).intersection(intents))
# ):
# sub_xds_dict, field_and_source_xds_dict = _open_sub_xds(
# ms_store, file_system=file_system, data_groups=data_groups
# )
# xds.attrs = {
# **xds.attrs,
# **sub_xds_dict,
# }
# for data_group_name, data_group_vals in data_groups.items():
# xds[data_group_vals["correlated_data"]].attrs[
# "field_and_source_xds"
# ] = field_and_source_xds_dict[data_group_name]
# ps[ms_name] = MeasurementSetXds(xds)
# # except Exception as e:
# # logger.warning(f"Could not open {ms_name} due to {e}")
# # continue
# return ps
# def _open_sub_xds(ms_store, file_system, data_groups, load=False):
# sub_xds_dict = {}
# field_and_source_xds_dict = {}
# if isinstance(file_system, s3fs.core.S3FileSystem):
# file_names = [
# bd.split(sep="/")[-1] for bd in file_system.listdir(ms_store, detail=False)
# ]
# else:
# file_names = file_system.listdir(ms_store)
# file_names = [item for item in file_names if not item.startswith(".")]
# file_names.remove("correlated_xds")
# field_dict = {"field_and_source_xds_" + key: key for key in data_groups.keys()}
# # field_and_source_xds_name_start = "FIELD"
# for n in file_names:
# xds = _open_dataset(
# os.path.join(ms_store, n), load=load, file_system=file_system
# )
# # Skip empty tables
# if not xds.coords and not xds.data_vars:
# continue
# if n in field_dict.keys():
# field_and_source_xds_dict[field_dict[n]] = xds
# else:
# sub_xds_dict[n] = xds
# return sub_xds_dict, field_and_source_xds_dict
# def _get_data_name(xds, data_group):
# if "visibility" in xds.attrs["data_groups"][data_group]:
# data_name = xds.attrs["data_groups"][data_group]["visibility"]
# elif "spectrum" in xds.attrs["data_groups"][data_group]:
# data_name = xds.attrs["data_groups"][data_group]["spectrum"]
# else:
# error_message = (
# "No Visibility or Spectrum data variable found in data_group "
# + data_group
# + "."
# )
# logger.exception(error_message)
# raise ValueError(error_message)
# return data_name