You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

250 lines
9.4 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import pandas as pd
import numpy as np
def gen_steps(
timesteps=340,
channels=6,
p_steps=0.01,
step_ampl_min=0.0002,
step_ampl_max=0.05,
channel_corr_matrix=None
):
if channel_corr_matrix is None:
channel_corr_matrix = np.ones((channels,channels))
start = np.random.binomial(n=1,p=p_steps, size=(timesteps,1))
ampl_params = np.random.random(size=(1,channels))*step_ampl_max + step_ampl_min
sign_params = np.random.choice([-1,1], size=(timesteps, channels), replace=True)
corr_params = channel_corr_matrix[np.random.randint(channels)]
steps = np.cumsum(start * sign_params, axis=0) * ampl_params * corr_params
return steps, np.nonzero(start)[0]
def gen_trend(
timesteps=340,
channels=6,
step_trend_min=5,
step_trend_max=10,
channel_corr_matrix=None
):
trend_params = np.random.choice(np.arange(step_trend_min, max(step_trend_min + 1e-14,step_trend_max)), size=channels, replace=True) / timesteps
if channel_corr_matrix is None:
trend_params = trend_params * np.random.choice([1,-1], size=channels)
else:
trend_params = trend_params * channel_corr_matrix[np.random.randint(channel_corr_matrix.shape[0])]
trend_params = trend_params[np.newaxis,:]
trend = np.transpose(np.tile(np.linspace(0, timesteps, timesteps), reps=(channels,1))) * trend_params
trend = np.concatenate([trend[:timesteps//2],trend[timesteps//2:0:-1]], axis=0)
trend = np.roll(trend, shift=np.random.randint(trend.shape[0]))
return trend
def gen_noise(
timesteps=340,
channels=6,
noise_mean=0,
noise_std_min=0.00001,
noise_std_max=0.00003,
noise_std_stp=0.00001/50
):
noise_std = np.full(channels, fill_value=noise_std_min)
amplification = np.random.choice([1,100], size=channels, p=[0.99,0.01])
noise_std = np.random.choice(np.arange(noise_std_min, noise_std_max + noise_std_stp, noise_std_stp)) * amplification
noise_cov = np.eye(channels)*noise_std
signal = np.random.multivariate_normal(np.repeat(noise_mean, channels), noise_cov, timesteps)
return signal
def gen_harmon_signal(
timesteps=340,
harm_ampl_min=0.02,
harm_ampl_max=0.05,
harm_ampl_step=0.005
):
harm_ampl = np.full(shape=(timesteps,1), fill_value=harm_ampl_min)
harm_ampl_choices = np.arange(start=harm_ampl_min, stop=harm_ampl_max + harm_ampl_step, step=harm_ampl_step)
for i in range(timesteps):
harm_ampl[i] = np.random.choice(harm_ampl_choices)
cos_arg = np.zeros((timesteps, 3))
cos_arg[:, 0] = np.linspace(0, 2 * np.pi, timesteps)
cos_arg[:, 1] = np.linspace(0, 4 * np.pi, timesteps)
if np.random.random() < 0.95:
cos_arg[:, 2] = np.linspace(0, 8 * np.pi, timesteps)
else:
# print("Curvy signal appears!")
cos_arg[:, 2] = np.linspace(0, 64 * np.pi, timesteps)
cos_arg[:, 0] *= 0
cos_arg[:, 1] *= 0
cos_harms = np.cos(cos_arg)
flip = 1 if np.random.random() < 0.5 else -1
cos_harms[:, 0] *= flip
flip = 1 if np.random.random() < 0.5 else -1
cos_harms[:, 1] *= flip
flip = 1 if np.random.random() < 0.5 else -1
cos_harms[:, 2] *= flip
cos_add = cos_harms.sum(axis=1)[:, np.newaxis]
cos_add *= harm_ampl
return cos_add
def gen_scan_motion_signal(
timesteps,
channels,
scale=None,
corr=None,
noise_mean=0,
noise_std_min=0.00001,
noise_std_max=0.00002,
noise_std_stp=0.00001/50,
harm_ampl_min=0.005,
harm_ampl_max=0.01,
harm_ampl_step=0.0001,
probability_steps=0.03,
step_ampl_min=0.008,
step_ampl_max=0.2,
step_trend_min=np.random.random(),
step_trend_max=np.random.random()
):
"""
Example of generated signal
```
import matplotlib.pyplot as plt
from synthetic_dataset_utils import gen_scan_motion_signal
new_signal_fmri_scan, new_signal_fmri_steps = gen_scan_motion_signal(340, 6)
plt.plot(new_signal_fmri_scan)
plt.vlines(new_signal_fmri_steps, ymin=new_signal_fmri_scan.min(), ymax=new_signal_fmri_scan.max(), color='black', linewidth=1)
```
"""
scale = np.ones((1,channels)) if scale is None else scale
signal = np.zeros([timesteps, channels])
signal += gen_noise(
timesteps=timesteps,
channels=channels,
noise_mean=noise_mean,
noise_std_min=noise_std_min,
noise_std_max=noise_std_max,
noise_std_stp=noise_std_stp
)
signal += gen_harmon_signal(
timesteps=timesteps,
harm_ampl_min=harm_ampl_min,
harm_ampl_max=harm_ampl_max,
harm_ampl_step=harm_ampl_step
)
step_signal, step_indexes = gen_steps(
timesteps=timesteps,
channels=channels,
p_steps=probability_steps,
step_ampl_min=step_ampl_min,
step_ampl_max=step_ampl_max,
channel_corr_matrix=corr
)
signal += step_signal
signal += gen_trend(
timesteps=timesteps,
channels=channels,
step_trend_min=step_trend_min,
step_trend_max=step_trend_max,
channel_corr_matrix=None
)
signal *= scale
return signal, step_indexes
def gen_Xy(
sample_num,
timesteps,
channels,
window_size,
noise_mean=0,
noise_std_min=0.0001,
noise_std_max=0.002,
noise_std_stp=0.00001,
harm_ampl_min=0.005,
harm_ampl_max=0.01,
harm_ampl_step=0.0001,
probability_steps=0.03,
step_ampl_min=0.008,
step_ampl_max=0.2,
channel_corr_matrix=None,
scale = None
):
"""
Использует gen_scan_motion_signal для генерации одной fMRI записи.
В каждую запись внедрены аномалии сдвига.
Сигнал нарезается на кусочки размером window_size и маркируются метками двух классов: норма, аномалия.
Кусочки перемешиваются два раза - внутри каждого класса перед уравниванием количества примеров классов и внутри датасета.
"""
scale = np.ones((1,channels)) if scale is None else scale
# ---------------------------- Generate fmri signals ---------------------------------------------
signal_fmri_scans = []
signal_fmri_steps = []
for i in range(sample_num):
signal, step_indexes = gen_scan_motion_signal(
timesteps,
channels,
scale=scale,
corr=None,
noise_mean=noise_mean,
noise_std_min=noise_std_min,
noise_std_max=noise_std_max,
noise_std_stp=noise_std_stp,
harm_ampl_min=harm_ampl_min,
harm_ampl_max=harm_ampl_max,
harm_ampl_step=harm_ampl_step,
probability_steps=probability_steps,
step_ampl_min=step_ampl_min,
step_ampl_max=step_ampl_max,
step_trend_min=np.random.random(),
step_trend_max=np.random.random()*0.5)
signal_fmri_scans.append(signal)
# indexes can be merged like in detect_shifts with window_merge(step_indexes, window_size=window_size)
signal_fmri_steps.append(step_indexes)
signal_fmri_scans = np.stack(signal_fmri_scans)
# -------- Chop singals into short signals with sliding window and divide into two categories: normal and anomaly -----------
normal_indexes = []
anomaly_indexes = []
normal_windows = []
anomaly_windows = []
for scan_idx, (fmri_scan, anomaly_window_starts) in enumerate(zip(signal_fmri_scans, signal_fmri_steps)):
for start in range(fmri_scan.shape[0]):
window_values = fmri_scan[start:start+window_size,:]
if window_values.shape[0] == window_size:
anomaly = False
for anomaly_window_start in anomaly_window_starts:
if (start <= anomaly_window_start-2) and (anomaly_window_start+2 < start+window_size):
anomaly = True
break
if anomaly:
anomaly_windows.append(window_values)
anomaly_indexes.append([scan_idx, start])
else:
normal_windows.append(window_values)
normal_indexes.append([scan_idx, start])
normal_windows = np.array(normal_windows)
anomaly_windows = np.array(anomaly_windows)
# ----- Prepare dataset and labels ------
# To get balanced dataset first shuffle across time and take only number equal to minimal presented class
np.random.shuffle(normal_windows)
np.random.shuffle(anomaly_windows)
normal_windows = normal_windows[:min(len(normal_windows),len(anomaly_windows))]
anomaly_windows = anomaly_windows[:min(len(normal_windows),len(anomaly_windows))]
X = np.concatenate((normal_windows, anomaly_windows), axis=0)
y = np.concatenate([np.repeat(0, normal_windows.shape[0]), np.repeat(1, anomaly_windows.shape[0])])
# Shuffle normal and anomaly examples
shuffled_index = np.arange(X.shape[0])
np.random.shuffle(shuffled_index)
X = X[shuffled_index]
y = y[shuffled_index]
break_point = int(X.shape[0]*0.8)
X_train = X[:break_point]
y_train = y[:break_point]
X_val = X[break_point:]
y_val = y[break_point:]
return X_train, y_train, X_val, y_val