AEJxLPS (Auroral electrojets SECS)#

Abstract: Access to the AEBS products, SECS type. This notebook uses code from the previous notebook to build a routine that is flexible to plot either the LC or SECS products - this demonstrates a prototype quicklook routine.

%load_ext watermark
%watermark -i -v -p viresclient,pandas,xarray,matplotlib
Python implementation: CPython
Python version       : 3.11.6
IPython version      : 8.18.0

viresclient: 0.12.0
pandas     : 2.1.3
xarray     : 2023.12.0
matplotlib : 3.8.2
from viresclient import SwarmRequest
import datetime as dt
import numpy as np
import pandas as pd
import xarray as xr
import matplotlib.pyplot as plt
import matplotlib as mpl

request = SwarmRequest()

AEBS product information#

See previous notebook, β€œDemo AEBS products (LC)”, for an introduction to these products.

Function to request data from VirES and reshape it#

def fetch_data(start_time=None, end_time=None, spacecraft=None, AEBS_type="L"):
    """DUPLICATED FROM PREVIOUS NOTEBOOK. TO BE REFACTORED"""

    # Fetch data from VirES
    auxiliaries = ['OrbitNumber', 'QDLat', 'QDOrbitDirection', 'OrbitDirection', 'MLT']
    if AEBS_type == "L":
        measurement_vars = ["J_NE"]
    elif AEBS_type == "S":
        measurement_vars = ["J_CF_NE", "J_DF_NE"]
    # Fetch LPL/LPS
    request.set_collection(f'SW_OPER_AEJ{spacecraft}LP{AEBS_type}_2F')
    request.set_products(
        measurements=measurement_vars,
        auxiliaries=auxiliaries,
    )
    data = request.get_between(start_time, end_time, asynchronous=False, show_progress=False)
    ds_lp = data.as_xarray()
    # Fetch LPL/LPS Quality
    request.set_collection(f'SW_OPER_AEJ{spacecraft}LP{AEBS_type}_2F:Quality')
    request.set_products(
        measurements=['RMS_misfit', 'Confidence'],
    )
    data = request.get_between(start_time, end_time, asynchronous=False, show_progress=False)
    ds_lpq = data.as_xarray()
    # Fetch PBL
    request.set_collection(f'SW_OPER_AEJ{spacecraft}PB{AEBS_type}_2F')
    request.set_products(
        measurements=['PointType', 'Flags'],
        auxiliaries=auxiliaries
    )
    data = request.get_between(start_time, end_time, asynchronous=False, show_progress=False)
    ds_pb = data.as_xarray()

    # Meaning of PointType
    PointType_meanings = {
        "WEJ_peak": 0,  # minimum
        "EEJ_peak": 1,  # maximum
        "WEJ_eq_bound_s": 2,  # equatorward (pair start)
        "EEJ_eq_bound_s": 3,
        "WEJ_po_bound_s": 6,  # poleward
        "EEJ_po_bound_s": 7,
        "WEJ_eq_bound_e": 10,  # equatorward (pair end)
        "EEJ_eq_bound_e": 11,
        "WEJ_po_bound_e": 14,  # poleward
        "EEJ_po_bound_e": 15,
    }
    # Add new data variables (boolean Type) according to the dictionary above
    ds_pb = ds_pb.assign(
        {name: ds_pb["PointType"] == PointType_meanings[name]
        for name in PointType_meanings.keys()}
    )

    # Merge datasets together
    def drop_duplicate_times(_ds):
        _, index = np.unique(_ds['Timestamp'], return_index=True)
        return _ds.isel(Timestamp=index)
    def merge_attrs(_ds1, _ds2):
        attrs = {"Sources":[], "MagneticModels":[], "AppliedFilters":[]}
        for item in ["Sources", "MagneticModels", "AppliedFilters"]:
            attrs[item] = list(set(_ds1.attrs[item] + _ds2.attrs[item]))
        return attrs
    # Create new dataset from just the newly created PointType arrays
    # This is created on a non-repeating Timestamp coordinate
    ds = xr.Dataset(
        {name: ds_pb[name].where(ds_pb[name], drop=True)
        for name in PointType_meanings.keys()}
    )
    # Merge in the positional and auxiliary data
    data_vars = list(set(ds_pb.data_vars).difference(set(PointType_meanings.keys())))
    data_vars.remove("PointType")
    ds = ds.merge(
        (ds_pb[data_vars]
            .pipe(drop_duplicate_times))
    )
    # Merge together with the LPL data
    # Note that the Timestamp coordinates aren't equal

    # Separately merge data with matching and missing time sample points in ds_lpl
    idx_present = list(set(ds["Timestamp"].values).intersection(set(ds_lp["Timestamp"].values)))
    idx_missing = list(set(ds["Timestamp"].values).difference(set(ds_lp["Timestamp"].values)))
    # Override prioritises the first dataset (ds_lpl) where there are conflicts
    ds2 = ds_lp.merge(ds.sel(Timestamp=idx_present), join="outer", compat="override")
    ds2 = ds2.merge(ds.sel(Timestamp=idx_missing), join="outer")
    # Update the metadata
    ds2.attrs = merge_attrs(ds_lp, ds_pb)

    # Switch the point type arrays to uint8 or bool for performance?
    # But the .where operations later cast them back to float64 since gaps are filled with nan
    for name in PointType_meanings.keys():
        ds2[name] = ds2[name].astype("uint8").fillna(False)
        # ds2[name] = ds2[name].fillna(False).astype(bool)

    ds = ds2

    # Append the PBL Flags information into the LPL:Quality dataset to use as a lookup table
    ds_lpq = ds_lpq.assign(
        Flags_PBL=
        ds_pb["Flags"]
        .pipe(drop_duplicate_times)
        .reindex_like(ds_lpq, method="nearest"),
    )

    return ds, ds_lpq

Plotting function#

# Bit numbers which indicate non-nominal state
# Check SW-DS-DTU-GS-003_AEBS_PDD for details
BITS_PBL_FLAGS_EEJ_MINOR = (2, 3, 6)
BITS_PBL_FLAGS_WEJ_MINOR = (4, 5, 6)
BITS_PBL_FLAGS_EEJ_BAD = (0, 7, 8, 11)
BITS_PBL_FLAGS_WEJ_BAD = (1, 9, 10, 12)

def check_PBL_Flags(flags=0b0, EJ_type="WEJ"):
    """Return "good", "poor", or "bad" depending on status"""
    def _check_bits(bitno_set):
        return any(flags & (1 << bitno) for bitno in bitno_set)
    if EJ_type == "WEJ":
        if _check_bits(BITS_PBL_FLAGS_WEJ_BAD):
            return "bad"
        elif _check_bits(BITS_PBL_FLAGS_WEJ_MINOR):
            return "poor"
        else:
            return "good"
    elif EJ_type == "EEJ":
        if _check_bits(BITS_PBL_FLAGS_EEJ_BAD):
            return "bad"
        elif _check_bits(BITS_PBL_FLAGS_EEJ_MINOR):
            return "poor"
        else:
            return "good"

glyphs = {
    "WEJ_peak": {"marker": 'v', "color":'tab:red'},  # minimum
    "EEJ_peak": {"marker": '^', "color":'tab:purple'},  # maximum
    "WEJ_eq_bound_s": {"marker": '>', "color":'black'},  # equatorward (pair start)
    "EEJ_eq_bound_s": {"marker": '>', "color":'black'},
    "WEJ_po_bound_s": {"marker": '>', "color":'black'},  # poleward
    "EEJ_po_bound_s": {"marker": '>', "color":'black'},
    "WEJ_eq_bound_e": {"marker": '<', "color":'black'},  # equatorward (pair end)
    "EEJ_eq_bound_e": {"marker": '<', "color":'black'},
    "WEJ_po_bound_e": {"marker": '<', "color":'black'},  # poleward
    "EEJ_po_bound_e": {"marker": '<', "color":'black'},
}


def plot_stack(ds, ds_lpq, hemisphere="North", x_axis="Latitude", AEBS_type="L"):
    # Identify which variable to plot from dataset
    # If accessing the SECS (LPS) data, sum the DF & CF parts
    if "J_CF_NE" in ds.data_vars:
        ds["J_NE"] = ds["J_DF_NE"] + ds["J_CF_NE"]
    plotvar = "J_NE"
    orbdir = "OrbitDirection" if x_axis=="Latitude" else "QDOrbitDirection"
    markersize = 1 if AEBS_type=="S" else 5
    # Select hemisphere
    if hemisphere == "North":
        ds = ds.where(ds["Latitude"]>0, drop=True)
    elif hemisphere == "South":
        ds = ds.where(ds["Latitude"]<0, drop=True)
    # Generate plot with split by columns: ascending/descending to/from pole
    #                          by rows: successive orbits
    # Skip when no data available
    if len(ds["Timestamp"]) == 0:
        return None, None
    fig, axes = plt.subplots(
        nrows=len(ds.groupby("OrbitNumber")), ncols=2, sharex="col", sharey="all",
        figsize=(10, 20)
    )
    max_ylim = np.max(np.abs(ds[plotvar].sel({"NE": "E"})))
    # Loop through each orbit
    for i, (_, ds_orbit) in enumerate(ds.groupby("OrbitNumber")):
        if hemisphere == "North":
            ds_orb_asc = ds_orbit.where(ds_orbit[orbdir] == 1, drop=True)
            ds_orb_desc = ds_orbit.where(ds_orbit[orbdir] == -1, drop=True)
        if hemisphere == "South":
            ds_orb_asc = ds_orbit.where(ds_orbit[orbdir] == -1, drop=True)
            ds_orb_desc = ds_orbit.where(ds_orbit[orbdir] == 1, drop=True)
        # Loop through ascending and descending sections
        for j, _ds in enumerate((ds_orb_asc, ds_orb_desc)):
            if len(_ds.Timestamp) == 0:
                continue
            # Line plot of current strength
            axes[i, j].plot(
                _ds[x_axis], _ds[plotvar].sel({"NE": "E"}),
                color="tab:blue", marker=".", markersize=markersize, linestyle=""
            )
            axes[i, j].plot(
                _ds[x_axis], _ds[plotvar].sel({"NE": "N"}),
                color="tab:grey", marker=".", markersize=markersize, linestyle=""
            )
            # Plot glyphs at the peaks and boundaries locations
            for name in glyphs.keys():
                __ds = _ds.where(_ds[name], drop=True)
                try:
                    for lat in __ds[x_axis]:
                        axes[i, j].plot(
                            lat, 0,
                            marker=glyphs[name]["marker"], color=glyphs[name]["color"]
                        )
                except Exception:
                    pass
            # Identify Quality and Flags info
            # Use either the start time of the section or the end, depending on asc or desc
            index = 0 if j == 0 else -1
            t = _ds["Timestamp"].isel(Timestamp=index).values
            _ds_qualflags = ds_lpq.sel(Timestamp=t, method="nearest")
            pbl_flags = int(_ds_qualflags["Flags_PBL"].values)
            lpl_rms_misfit = float(_ds_qualflags["RMS_misfit"].values)
            lpl_confidence = float(_ds_qualflags["Confidence"].values)
            # Shade WEJ and EEJ regions, only if well-defined
            # def _shade_EJ_region(_ds=None, EJ="WEJ", color="tab:red", alpha=0.3):
            wej_status = check_PBL_Flags(pbl_flags, "WEJ")
            eej_status = check_PBL_Flags(pbl_flags, "EEJ")
            if wej_status in ["good", "poor"]:
                alpha = 0.3 if wej_status == "good" else 0.1
                try:
                    WEJ_left = _ds.where(
                        (_ds["WEJ_eq_bound_s"] == 1) | (_ds["WEJ_po_bound_s"] == 1), drop=True)
                    WEJ_right = _ds.where(
                        (_ds["WEJ_eq_bound_e"] == 1) | (_ds["WEJ_po_bound_e"] == 1), drop=True)
                    x1 = WEJ_left[x_axis][0]
                    x2 = WEJ_right[x_axis][0]
                    axes[i, j].fill_betweenx(
                        [-max_ylim, max_ylim], [x1, x1], [x2, x2], color="tab:red", alpha=alpha)
                except Exception:
                    pass
            if eej_status in ["good", "poor"]:
                alpha = 0.3 if eej_status == "good" else 0.15
                try:
                    EEJ_left = _ds.where(
                        (_ds["EEJ_eq_bound_s"] == 1) | (_ds["EEJ_po_bound_s"] == 1), drop=True)
                    EEJ_right = _ds.where(
                        (_ds["EEJ_eq_bound_e"] == 1) | (_ds["EEJ_po_bound_e"] == 1), drop=True)
                    x1 = EEJ_left[x_axis][0]
                    x2 = EEJ_right[x_axis][0]
                    axes[i, j].fill_betweenx(
                        [-max_ylim, max_ylim], [x1, x1], [x2, x2], color="tab:purple", alpha=alpha)
                except Exception:
                    pass
            # Write the LPL:Quality and PBL Flags info
            ha = "right" if j == 0 else "left"
            textx = 0.98 if j == 0 else 0.02
            axes[i, j].text(
                textx, 0.95,
                f"RMS Misfit {np.round(lpl_rms_misfit, 2)}; Confidence {np.round(lpl_confidence, 2)}",
                transform=axes[i, j].transAxes, verticalalignment="top", horizontalalignment=ha
            )
            axes[i, j].text(
                textx, 0.05,
                f"PBL Flags {pbl_flags:013b}",
                transform=axes[i, j].transAxes, verticalalignment="bottom", horizontalalignment=ha
            )
        # Write the start/end time and MLT of the section, and the orbit number
        def _format_utc(t):
            return f"UTC {t.strftime('%H:%M')}"
        def _format_mlt(mlt):
            hour, fraction = divmod(mlt, 1)
            t = dt.time(int(hour), minute=int(60*fraction))
            return f"MLT {t.strftime('%H:%M')}"
        try:
            # Left part (section starting UTC, MLT, OrbitNumber)
            time_s = pd.to_datetime(ds_orb_asc["Timestamp"].isel(Timestamp=0).data)
            mlt_s = ds_orb_asc["MLT"].dropna(dim="Timestamp").isel(Timestamp=0).data
            orbit_number = int(ds_orb_asc["OrbitNumber"].isel(Timestamp=0).data)
            axes[i, 0].text(
                0.01, 0.95, f"{_format_utc(time_s)}\n{_format_mlt(mlt_s)}",
                transform=axes[i, 0].transAxes, verticalalignment="top"
            )
            axes[i, 0].text(
                0.01, 0.05, f"Orbit {orbit_number}",
                transform=axes[i, 0].transAxes, verticalalignment="bottom"
            )
        except Exception:
            pass
        try:
            # Right part (section ending UTC, MLT)
            time_e = pd.to_datetime(ds_orb_desc["Timestamp"].isel(Timestamp=-1).data)
            mlt_e = ds_orb_desc["MLT"].dropna(dim="Timestamp").isel(Timestamp=-1).data
            axes[i, 1].text(
                0.99, 0.95, f"{_format_utc(time_e)}\n{_format_mlt(mlt_e)}",
                transform=axes[i, 1].transAxes, verticalalignment="top", horizontalalignment="right"
            )
        except Exception:
            pass
    # Extra config of axes and figure text
    axes[0, 0].set_ylim(-max_ylim, max_ylim)
    if hemisphere == "North":
        axes[0, 0].set_xlim(50, 90)
        axes[0, 1].set_xlim(90, 50)
    elif hemisphere == "South":
        axes[0, 0].set_xlim(-50, -90)
        axes[0, 1].set_xlim(-90, -50)
    for ax in axes.flatten():
        ax.grid()
    axes[-1, 0].set_xlabel(x_axis)
    axes[-1, 0].set_ylabel("Horizontal currents\n[ A.km$^{-1}$ ]")
    time = pd.to_datetime(ds["Timestamp"].isel(Timestamp=0).data)
    spacecraft = ds["Spacecraft"].dropna(dim="Timestamp").isel(Timestamp=0).data
    AEBS_type_name = "LC" if AEBS_type == "L" else "SECS"
    fig.text(
        0.5, 0.9, f"{time.strftime('%Y-%m-%d')}\nSwarm {spacecraft}\n{hemisphere}\nAEBS: {AEBS_type_name}",
        transform=fig.transFigure, horizontalalignment="center",
    )
    fig.subplots_adjust(wspace=0, hspace=0)
    return fig, axes

Fetching and plotting function#

def quicklook(day="2015-01-01", hemisphere="North", spacecraft="A", AEBS_type="L", xaxis="Latitude"):
    start_time = dt.datetime.fromisoformat(day)
    end_time = start_time + dt.timedelta(days=1)
    ds, ds_lpq = fetch_data(start_time, end_time, spacecraft, AEBS_type)
    fig, axes = plot_stack(ds, ds_lpq, hemisphere, xaxis, AEBS_type)
    return ds, fig, axes

Consecutive orbits are shown in consecutive rows, centered over the pole. The starting and ending times (UTC and MLT) of the orbital section are shown at the left and right. Westward (WEJ) and Eastward (EEJ) electrojet extents and peak intensities are indicated:

  • Blue dots: Estimated current density in Eastward direction, J_NE (E)

  • Grey dots: Estimated current density in Northward direction, J_NE (N)

  • Red/Purple shaded region: WEJ/EEJ extent (boundaries marked by black triangles)

  • Red/Purple triangles: Locations of peak WEJ/EEJ intensity

Select AEBS_type as S to get SECS results, L to get LC results
SECS = spherical elementary current systems method
LC = Line current method

Notes:
The code is currently quite fragile, so it is broken on some days. Sometimes the electrojet regions are not shaded correctly. Only the horizontal currents are currently shown.

quicklook(day="2016-01-01", hemisphere="North", spacecraft="A", AEBS_type="S", xaxis="Latitude");
/opt/conda/lib/python3.11/site-packages/xarray/core/duck_array_ops.py:203: RuntimeWarning: invalid value encountered in cast
  return data.astype(dtype, **kwargs)
../_images/fd1a55bab255a2350cb68ba949ee328f8ebda38c6c47158b456392dc2027bdb8.png
quicklook(day="2016-01-01", hemisphere="North", spacecraft="A", AEBS_type="L", xaxis="Latitude");
/opt/conda/lib/python3.11/site-packages/xarray/core/duck_array_ops.py:203: RuntimeWarning: invalid value encountered in cast
  return data.astype(dtype, **kwargs)
../_images/d6c3a975832ba4d21172a901ed862532926608f5c824679e4ba206943016788f.png