Han Lab ODNP data processingΒΆ

An example user-defined function for processing Han Lab ODNP data with the DNPLab package.

For the function below the call would look something like,

"""
lsobject = ls.start(
    parent_directory,
    classifiers=["tcorr", "ksigma"],
    function=han_lab.calc_odnp,
    function_args=hyd_dict,
)

lsobject.drive()
"""
# parent_directory contains folders of han_lab data collected using "rb_dnp1" at the CNSI
# facility. Add patterns, skip, date searching, etc. according to the lsframe docs. The
# "hyd_dict" is the dictionary of input constants for dnpHydration, according to DNPLab
# docs.

Import DNPLab and any other packages that may be needed for the functions,

import dnplab as dnp
import numpy as np
import os
import copy

Function from hydrationGUI of DNPLab for optimizing center of integration window,

def optCenter(ws, width, starting_center, phase):
    optcenter_workspace = copy.deepcopy(ws)
    intgrl_array = []
    indx = range(starting_center - 50, starting_center + 50)
    optcenter_workspace["proc"].values *= np.exp(-1j * phase)
    for k in indx:
        dnp.dnpTools.integrate(
            optcenter_workspace,
            integrate_center=k,
            integrate_width=width,
        )
        if len(optcenter_workspace["integrals"].values) > 1:
            intgrl_array.append(sum(abs(optcenter_workspace["integrals"].real.values)))
        else:
            intgrl_array.append(abs(optcenter_workspace["integrals"].real.values[-1]))

    cent = np.argmax(intgrl_array)

    return indx[cent]

Function from hydrationGUI of DNPLab for optimizing phase,

def optPhase(ws, width, starting_center, starting_phase):
    temp_data = ws["proc"][
        "f2", (starting_center - width, starting_center + width)
    ].values

    phases = np.linspace(
        starting_phase - np.pi / 2, starting_phase + np.pi / 2, 100
    ).reshape(1, -1)
    rotated_data = (temp_data.reshape(-1, 1)) * np.exp(-1j * phases)
    bestindex = np.argmax(
        (np.real(rotated_data) ** 2).sum(axis=0)
        / (np.imag(rotated_data) ** 2).sum(axis=0)
    )
    starting_phase = phases[0, bestindex]

    if ws["proc"].ndim == 2:

        phases = np.linspace(
            starting_phase - np.pi / 4,
            starting_phase + np.pi / 4,
            100,
        )
        imag_sum = []
        for indx, k in enumerate(phases):
            ws_rot = copy.deepcopy(ws)
            ws_rot["proc"].values *= np.exp(-1j * k)
            dnp.dnpTools.integrate(
                ws_rot,
                integrate_center=starting_center,
                integrate_width=width * 2,
            )
            imag_sum.append(np.sum(abs(ws_rot["proc"].imag.values * -1j)))

        starting_phase = phases[np.argmin(imag_sum)]

    base_data1 = ws["proc"][
        "f2",
        (
            (starting_center - width * 4),
            (starting_center - width / 2),
        ),
    ].values
    base_data2 = ws["proc"][
        "f2",
        (
            (starting_center + width / 2),
            (starting_center + width * 4),
        ),
    ].values
    base_data = np.concatenate((base_data2, base_data1))

    phases = np.linspace(
        starting_phase - np.pi / 4, starting_phase + np.pi / 4, 100
    ).reshape(1, -1)
    rotated_data = (base_data.reshape(-1, 1)) * np.exp(-1j * phases)
    bestindex = np.argmin(abs(np.real(rotated_data)).sum(axis=0))

    return phases[0, bestindex]

Function from hydrationGUI of DNPLab for optimizing integration window width,

def optWidth(ws, starting_width, center, phase):
    ydata = abs(
        np.real(
            ws["proc"][
                "f2",
                (
                    center - starting_width / 2,
                    center + starting_width / 2,
                ),
            ].values
            * np.exp(-1j * phase)
        )
    )
    xdata = np.ravel(
        ws["proc"][
            "f2",
            (
                center - starting_width / 2,
                center + starting_width / 2,
            ),
        ].coords["f2"]
    )
    qual_factor = 1 / 3
    if ws["proc"].ndim == 1:
        one_third = np.where(ydata > max(ydata) * qual_factor)
        one_third = np.ravel(one_third)

        center = optCenter(
            ws, (xdata[one_third[-1]] - xdata[one_third[0]]), center, phase
        )

        return center, (xdata[one_third[-1]] - xdata[one_third[0]])
    else:
        min_x = []
        max_x = []
        for k in range(0, ydata.shape[1]):
            one_third = np.where(ydata[:, k] > max(ydata[:, k]) * qual_factor)
            one_third = np.ravel(one_third)
            min_x.append(xdata[one_third[0]])
            max_x.append(xdata[one_third[-1]])

        center = optCenter(ws, max(max_x) - min(min_x), center, phase)

        return center, max(max_x) - min(min_x)

Auto-process function from hydrationGUI. The function returns zeros where errors are encountered.

def calc_odnp(path, hyd):

    print("Working on: " + path)

    folder_structure_p0 = 5
    folder_structure_enh = range(6, 27)
    folder_structure_T1 = range(28, 33)
    folder_structure_T10 = 304

    E_power_List = dnp.dnpIO.cnsi.get_powers(
        path,
        "power",
        folder_structure_enh,
    )
    Epowers = np.add(E_power_List, 21.9992)
    Epowers = np.divide(Epowers, 10)
    Epowers = np.power(10, Epowers)
    Epowers = np.multiply(1e-3, Epowers)

    T1_power_List = dnp.dnpIO.cnsi.get_powers(
        path,
        "t1_powers",
        folder_structure_T1,
    )
    T1powers = np.add(T1_power_List, 21.9992)
    T1powers = np.divide(T1powers, 10)
    T1powers = np.power(10, T1powers)
    T1powers = np.multiply(1e-3, T1powers)

    folder_structure_all = []
    folder_structure_all.append(folder_structure_p0)
    for k in folder_structure_enh:
        folder_structure_all.append(k)
    for k in folder_structure_T1:
        folder_structure_all.append(k)
    folder_structure_all.append(folder_structure_T10)

    Ep = []
    T1 = []
    for _, folder_num in enumerate(folder_structure_all):

        folder_path = os.path.join(path, str(folder_num))

        data = dnp.dnpImport.load(folder_path)
        ws = dnp.create_workspace("proc", data)

        dnp.dnpNMR.remove_offset(ws)
        dnp.dnpNMR.window(
            ws,
            linewidth=10,
        )
        dnp.dnpNMR.fourier_transform(ws, zero_fill_factor=2)

        if ws["proc"].ndim == 2:
            dnp.dnpNMR.align(ws)
            max_index = np.argmax(abs(ws["proc"].values), axis=0)[-1]
        elif ws["proc"].ndim == 1:
            max_index = np.argmax(abs(ws["proc"].values), axis=0)

        starting_width = 10
        starting_center = round(ws["proc"].coords["f2"][max_index])
        starting_phase = np.arctan(
            np.sum(ws["proc"].imag.values) / np.sum(ws["proc"].real.values)
        )

        starting_phase = optPhase(ws, starting_width, starting_center, starting_phase)

        center = optCenter(
            ws,
            starting_width,
            starting_center,
            starting_phase,
        )

        phase = optPhase(
            ws,
            starting_width,
            center,
            starting_phase,
        )

        width = starting_width
        # center, width = optWidth(ws, starting_width, center, phase)

        ws["proc"].values *= np.exp(-1j * phase)

        dnp.dnpTools.integrate(
            ws,
            integrate_center=center,
            integrate_width=width,
        )

        if len(ws["integrals"].values) > 1:
            dnp.dnpFit.exponential_fit(ws, type="T1")
            if folder_num == 304:
                hyd["T10"] = ws["fit"].attrs["T1"]
            else:
                T1.append(ws["fit"].attrs["T1"])
        else:
            if folder_num == 5:
                p0 = ws["integrals"].real.values[0]
            else:
                Ep.append(ws["integrals"].real.values[0] / p0)

    hyd.update(
        {
            "E": np.array(Ep),
            "E_power": np.array(Epowers),
            "T1": np.array(T1),
            "T1_power": np.array(T1powers),
        }
    )

    hydra = dnp.create_workspace()
    hydra.add("hydration_inputs", hyd)

    try:
        hydration_results = dnp.dnpHydration.hydration(hydra)
    except:
        hydration_results = {"ksigma": 0, "tcorr": 0}

    print("Found ksigma = " + str(hydration_results["ksigma"]))
    print("Found tcorr = " + str(hydration_results["tcorr"]))

    return hydration_results["tcorr"], hydration_results["ksigma"]

Total running time of the script: ( 0 minutes 0.000 seconds)

Gallery generated by Sphinx-Gallery