from proxtoolbox.Problems.OrbitalTomog.planar_molecule.orbitaltomog_data_processor import support_from_autocorrelation from proxtoolbox.Utilities.OrbitalTomog import shifted_fft, bin_array, pad_to_square from proxtoolbox.Problems.OrbitalTomog.Graphics.stack_viewer import XYZStackViewer import numpy as np from skimage.io import imread from itertools import combinations def data_processor(config): # Load data inp = imread(config['data_filename']) # Load measurement sensitivity, determine data_zeros sensitivity = imread(config['sensitivity_filename']) # Add the negative K_z part to determine a real-valued orbital? if 'add_negative_kz' in config and config['add_negative_kz']: if 'kz_dimension' not in config: config['kz_dimension'] = 0 inp = np.moveaxis(inp, config['kz_dimension'], 2) sensitivity = np.moveaxis(sensitivity, config['kz_dimension'], 2) inp, sensitivity = mirror_kspace(inp, sensitivity, square_array=True) inp = np.moveaxis(inp, 2, config['kz_dimension']) sensitivity = np.moveaxis(sensitivity, 2, config['kz_dimension']) ny, nx, nz = inp.shape config['data'] = abs(inp) config['data_zeros'] = (sensitivity == 0) # Keep the same resolution? if 'Ny' not in config or 'Nx' not in config or 'Nz' not in config: config['Ny'], config['Nx'], config['Nz'] = ny, nx, nz print('Setting problem dimensions based on data, i.e. %s' % str((ny, nx, nz))) elif ny != config['Ny'] or nx != config['Nx'] or nz != config['Nz']: nandata = np.where(config['data_zeros'], np.nan, config['data']) if not ('from intensity data' in config and config['from intensity data']): # binning must be done for the intensity-data, as that preserves the normalization config['data'] = np.sqrt(np.nan_to_num(bin_array(nandata ** 2, (config['Ny'], config["Nx"], config['Nz'])))) else: config['data'] = np.nan_to_num(bin_array(nandata, (config['Ny'], config["Nx"]))) config["data_zeros"] = bin_array(config['data_zeros'], (config['Ny'], config["Nx"], config['Nz'])) == 0 assert config['data_zeros'].shape == config['data'].shape, 'Non-matching sensitivity and data arrays' # Calculate electric field if 'from intensity data' in config and config['from intensity data']: # avoid sqrt of negative numbers (due to background subtraction) config['data'] = np.where(config['data'] > 0, np.sqrt(abs(config['data'])), np.zeros_like(config['data'])) config['norm_data'] = np.sqrt(np.sum(config['data'] ** 2)) # Object support determination if config['constraint'] in ['support only', 'real and support', 'nonnegative and support']: try: config['support'] = imread(config['support_filename']) except KeyError: # 'support filename does not exist, so define a support here' if 'threshold for support' not in config: config['threshold for support'] = 0.1 config['support'] = support_from_autocorrelation(config['data'], threshold=config['threshold for support'], absolute_autocorrelation=True, binary_dilate_support=1) if ('use_sparsity_with_support' in config and config['use_sparsity_with_support'] and 'sparsity_support' not in config): if 'threshold for support' not in config: config['threshold for support'] = 0.1 config['sparsity_support'] = support_from_autocorrelation(config['data'], threshold=config['threshold for support'], binary_dilate_support=1) # Initial guess ph_init = 2 * np.pi * np.random.random_sample(config['data'].shape) config['u_0'] = config['data'] * np.exp(1j * ph_init) if 'fourier_shift_arrays' in config and config['fourier_shift_arrays']: config['u_0'] = shifted_fft(config['u_0']) else: config['u_0'] = np.fft.fftn(config['u_0']) if config['dataprocessor_plotting']: input_viewer = XYZStackViewer(config['data'], cmap='viridis') config['input viewer'] = input_viewer # the reference keeps the matplotlib object alive support_viewer = XYZStackViewer(shifted_fft(config['data']).real) config['support viewer'] = support_viewer # the reference keeps the matplotlib object alive if 'fourier_shift_arrays' in config and config['fourier_shift_arrays']: for key in ['u_0', 'support', 'sparsity_support', 'data', 'data_zeros']: if key in config: config[key] = np.fft.fftshift(config[key]) # Other settings config['fresnel_nr'] = 0 config['FT_conv_kernel'] = 1 config['use_farfield_formula'] = True config['magn'] = 1 config['data_sq'] = abs(config['data']) ** 2 return config def mirror_kspace(kspace: np.ndarray, sensitivity: np.ndarray = None, shift_mirror: tuple = (1, 1, 1), square_array: bool = True): """ Mirror a 3d kspace array in the kz=0 plane. (where the kz-dimension corresponds to the last axis) :param kspace: kspace array, e.g. from arpes_wvlscan_to_kspace :param sensitivity: if given (from arpes_wvlscan_to_kspace), test :param shift_mirror: array rolling done to get good centering. is tested by sensitivity :param square_array: pad array to be square :return: 3d arrays kspace and sensitivity (if given) """ def mirror_array(arr: np.ndarray, roll=shift_mirror) -> np.ndarray: arr = np.concatenate([np.roll(np.flip(arr), (roll[0], roll[1]), axis=(0, 1)), arr[:, :, 1:]], axis=2) arr = np.roll(arr, roll[2], axis=2) return arr full_kspace = mirror_array(kspace) if square_array: if np.any(np.isnan(full_kspace)): cv = np.nan else: cv = 0 full_kspace = pad_to_square(full_kspace, constant_values=cv) if sensitivity is None: return full_kspace else: mirrored_sens = mirror_array(sensitivity) if np.any(np.isnan(mirrored_sens)): cv = np.nan else: cv = 0 mirrored_sens = pad_to_square(mirrored_sens, constant_values=cv) testslice = [len(mirrored_sens) // i for i in [2, 3, 4]] test_array = np.where(np.isnan(mirrored_sens), 0, 1) for tsl in testslice: testslices = [test_array[tsl], test_array[:, tsl], test_array[:, :, tsl], test_array[-tsl], test_array[:, -tsl], test_array[:, :, -tsl]] test_res = [] for a, b in combinations(testslices, 2): test_res += [np.all(np.isclose(a, b))] assert np.all( test_res), 'Non-matching test slices, indicating that shift_mirror parameter should be changed' return full_kspace, mirrored_sens