You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

250 lines
9.4 KiB
Python

2 years ago
import pandas as pd
import numpy as np
def gen_steps(
timesteps=340,
channels=6,
p_steps=0.01,
step_ampl_min=0.0002,
step_ampl_max=0.05,
channel_corr_matrix=None
):
if channel_corr_matrix is None:
channel_corr_matrix = np.ones((channels,channels))
start = np.random.binomial(n=1,p=p_steps, size=(timesteps,1))
ampl_params = np.random.random(size=(1,channels))*step_ampl_max + step_ampl_min
sign_params = np.random.choice([-1,1], size=(timesteps, channels), replace=True)
corr_params = channel_corr_matrix[np.random.randint(channels)]
steps = np.cumsum(start * sign_params, axis=0) * ampl_params * corr_params
return steps, np.nonzero(start)[0]
def gen_trend(
timesteps=340,
channels=6,
step_trend_min=5,
step_trend_max=10,
channel_corr_matrix=None
):
trend_params = np.random.choice(np.arange(step_trend_min, max(step_trend_min + 1e-14,step_trend_max)), size=channels, replace=True) / timesteps
if channel_corr_matrix is None:
trend_params = trend_params * np.random.choice([1,-1], size=channels)
else:
trend_params = trend_params * channel_corr_matrix[np.random.randint(channel_corr_matrix.shape[0])]
trend_params = trend_params[np.newaxis,:]
trend = np.transpose(np.tile(np.linspace(0, timesteps, timesteps), reps=(channels,1))) * trend_params
trend = np.concatenate([trend[:timesteps//2],trend[timesteps//2:0:-1]], axis=0)
trend = np.roll(trend, shift=np.random.randint(trend.shape[0]))
return trend
def gen_noise(
timesteps=340,
channels=6,
noise_mean=0,
noise_std_min=0.00001,
noise_std_max=0.00003,
noise_std_stp=0.00001/50
):
noise_std = np.full(channels, fill_value=noise_std_min)
amplification = np.random.choice([1,100], size=channels, p=[0.99,0.01])
noise_std = np.random.choice(np.arange(noise_std_min, noise_std_max + noise_std_stp, noise_std_stp)) * amplification
noise_cov = np.eye(channels)*noise_std
signal = np.random.multivariate_normal(np.repeat(noise_mean, channels), noise_cov, timesteps)
return signal
def gen_harmon_signal(
timesteps=340,
harm_ampl_min=0.02,
harm_ampl_max=0.05,
harm_ampl_step=0.005
):
harm_ampl = np.full(shape=(timesteps,1), fill_value=harm_ampl_min)
harm_ampl_choices = np.arange(start=harm_ampl_min, stop=harm_ampl_max + harm_ampl_step, step=harm_ampl_step)
for i in range(timesteps):
harm_ampl[i] = np.random.choice(harm_ampl_choices)
cos_arg = np.zeros((timesteps, 3))
cos_arg[:, 0] = np.linspace(0, 2 * np.pi, timesteps)
cos_arg[:, 1] = np.linspace(0, 4 * np.pi, timesteps)
if np.random.random() < 0.95:
cos_arg[:, 2] = np.linspace(0, 8 * np.pi, timesteps)
else:
# print("Curvy signal appears!")
cos_arg[:, 2] = np.linspace(0, 64 * np.pi, timesteps)
cos_arg[:, 0] *= 0
cos_arg[:, 1] *= 0
cos_harms = np.cos(cos_arg)
flip = 1 if np.random.random() < 0.5 else -1
cos_harms[:, 0] *= flip
flip = 1 if np.random.random() < 0.5 else -1
cos_harms[:, 1] *= flip
flip = 1 if np.random.random() < 0.5 else -1
cos_harms[:, 2] *= flip
cos_add = cos_harms.sum(axis=1)[:, np.newaxis]
cos_add *= harm_ampl
return cos_add
def gen_scan_motion_signal(
timesteps,
channels,
scale=None,
corr=None,
noise_mean=0,
noise_std_min=0.00001,
noise_std_max=0.00002,
noise_std_stp=0.00001/50,
harm_ampl_min=0.005,
harm_ampl_max=0.01,
harm_ampl_step=0.0001,
probability_steps=0.03,
step_ampl_min=0.008,
step_ampl_max=0.2,
step_trend_min=np.random.random(),
step_trend_max=np.random.random()
):
"""
Example of generated signal
```
import matplotlib.pyplot as plt
from synthetic_dataset_utils import gen_scan_motion_signal
new_signal_fmri_scan, new_signal_fmri_steps = gen_scan_motion_signal(340, 6)
plt.plot(new_signal_fmri_scan)
plt.vlines(new_signal_fmri_steps, ymin=new_signal_fmri_scan.min(), ymax=new_signal_fmri_scan.max(), color='black', linewidth=1)
```
"""
scale = np.ones((1,channels)) if scale is None else scale
signal = np.zeros([timesteps, channels])
signal += gen_noise(
timesteps=timesteps,
channels=channels,
noise_mean=noise_mean,
noise_std_min=noise_std_min,
noise_std_max=noise_std_max,
noise_std_stp=noise_std_stp
)
signal += gen_harmon_signal(
timesteps=timesteps,
harm_ampl_min=harm_ampl_min,
harm_ampl_max=harm_ampl_max,
harm_ampl_step=harm_ampl_step
)
step_signal, step_indexes = gen_steps(
timesteps=timesteps,
channels=channels,
p_steps=probability_steps,
step_ampl_min=step_ampl_min,
step_ampl_max=step_ampl_max,
channel_corr_matrix=corr
)
signal += step_signal
signal += gen_trend(
timesteps=timesteps,
channels=channels,
step_trend_min=step_trend_min,
step_trend_max=step_trend_max,
channel_corr_matrix=None
)
signal *= scale
return signal, step_indexes
def gen_Xy(
sample_num,
timesteps,
channels,
window_size,
noise_mean=0,
noise_std_min=0.0001,
noise_std_max=0.002,
noise_std_stp=0.00001,
harm_ampl_min=0.005,
harm_ampl_max=0.01,
harm_ampl_step=0.0001,
probability_steps=0.03,
step_ampl_min=0.008,
step_ampl_max=0.2,
channel_corr_matrix=None,
scale = None
):
"""
Использует gen_scan_motion_signal для генерации одной fMRI записи.
В каждую запись внедрены аномалии сдвига.
Сигнал нарезается на кусочки размером window_size и маркируются метками двух классов: норма, аномалия.
Кусочки перемешиваются два раза - внутри каждого класса перед уравниванием количества примеров классов и внутри датасета.
"""
scale = np.ones((1,channels)) if scale is None else scale
# ---------------------------- Generate fmri signals ---------------------------------------------
signal_fmri_scans = []
signal_fmri_steps = []
for i in range(sample_num):
signal, step_indexes = gen_scan_motion_signal(
timesteps,
channels,
scale=scale,
corr=None,
noise_mean=noise_mean,
noise_std_min=noise_std_min,
noise_std_max=noise_std_max,
noise_std_stp=noise_std_stp,
harm_ampl_min=harm_ampl_min,
harm_ampl_max=harm_ampl_max,
harm_ampl_step=harm_ampl_step,
probability_steps=probability_steps,
step_ampl_min=step_ampl_min,
step_ampl_max=step_ampl_max,
step_trend_min=np.random.random(),
step_trend_max=np.random.random()*0.5)
signal_fmri_scans.append(signal)
# indexes can be merged like in detect_shifts with window_merge(step_indexes, window_size=window_size)
signal_fmri_steps.append(step_indexes)
signal_fmri_scans = np.stack(signal_fmri_scans)
# -------- Chop singals into short signals with sliding window and divide into two categories: normal and anomaly -----------
normal_indexes = []
anomaly_indexes = []
normal_windows = []
anomaly_windows = []
for scan_idx, (fmri_scan, anomaly_window_starts) in enumerate(zip(signal_fmri_scans, signal_fmri_steps)):
for start in range(fmri_scan.shape[0]):
window_values = fmri_scan[start:start+window_size,:]
if window_values.shape[0] == window_size:
anomaly = False
for anomaly_window_start in anomaly_window_starts:
if (start <= anomaly_window_start-2) and (anomaly_window_start+2 < start+window_size):
anomaly = True
break
if anomaly:
anomaly_windows.append(window_values)
anomaly_indexes.append([scan_idx, start])
else:
normal_windows.append(window_values)
normal_indexes.append([scan_idx, start])
normal_windows = np.array(normal_windows)
anomaly_windows = np.array(anomaly_windows)
# ----- Prepare dataset and labels ------
# To get balanced dataset first shuffle across time and take only number equal to minimal presented class
np.random.shuffle(normal_windows)
np.random.shuffle(anomaly_windows)
normal_windows = normal_windows[:min(len(normal_windows),len(anomaly_windows))]
anomaly_windows = anomaly_windows[:min(len(normal_windows),len(anomaly_windows))]
X = np.concatenate((normal_windows, anomaly_windows), axis=0)
y = np.concatenate([np.repeat(0, normal_windows.shape[0]), np.repeat(1, anomaly_windows.shape[0])])
# Shuffle normal and anomaly examples
shuffled_index = np.arange(X.shape[0])
np.random.shuffle(shuffled_index)
X = X[shuffled_index]
y = y[shuffled_index]
break_point = int(X.shape[0]*0.8)
X_train = X[:break_point]
y_train = y[:break_point]
X_val = X[break_point:]
y_val = y[break_point:]
return X_train, y_train, X_val, y_val