import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import os
from sklearn.preprocessing import StandardScaler, MinMaxScaler
DATADIR = "wave_data/"
Vous trouverez dans le répertoire wave_data/
des signaux qui ont été générés pour l'exercice, nous allons vous demander de faire quelques manipulations afin de lire, transformer, analyser, etc.
Le but de cet exercice est simplement de valider que vous êtes en mesure de prendre rapidement en main python si vous devez rejoindre l'équipe. Il s'agit principalement de faire appel à des fonctions et des modules que nous utilisons régulièrement.
Si vous n'avez jamais fait de python, vous pouvez facilement trouver des éléments de réponses sur internet, et écrire du code fonctionnel.
Une fois le notebook rempli, il ne vous reste plus qu'à nous le renvoyer par email à l'adresse : [email protected].
Ouvrir un des fichiers du répertoire wave_data à l'aide de numpy
via la fonction genfromtxt
lst_signal = []
dirs = os.listdir(DATADIR)
for file in dirs:
lst_signal.append(np.genfromtxt(DATADIR + file))
len(lst_signal)
Ouvrir et regrouper l'ensemble des signaux dans le répertoire wave_data/ dans une liste regroupant des numpy.array
.
Former une seconde liste de même taille, contenant le numéro du fichier associé au signal dans la première liste à la même position, au format entier (le numéro étant x dans signal_x.txt).
Bien évidement les listes devront correspondre, nous conseillons l'utilisation d'une boucle for
de os.listdir
et des méthode associées au format str
tel que str.split
.
lst_idx = []
for file in dirs:
idx = file.split('.')[0].split('_')[1]
lst_idx.append(idx)
lst_idx = list(map(int, lst_idx))
len(lst_idx)
Afficher quelques plots des signaux en utilisant matplotlib
(plt.plot
)
Afficher l'ensemble des signaux avec plt.imshow
def plot_signal(arr, num_plots=4, name='signal'):
plt.figure(figsize=(18, 10), dpi= 60)
sp_idx = 1
for i in range(num_plots//2 * 2):
plt.subplot(num_plots//2, 2, sp_idx)
plt.plot(arr[i])
plt.title(name + ' ' + str(lst_idx[i]), fontsize=12)
sp_idx += 1
plt.tight_layout() # adjust spacing between subplots to minimize the overlaps
plot_signal(lst_signal) # plots 4 signaux
Apres avoir converti la liste en un array de taille (nb_sample, nb_feature), normaliser les signaux en utilisant MinMaxScaler
de scikit-learn
arr_signal = np.asarray(lst_signal)
arr_signal.shape # n_sanples x n_features
scaler = MinMaxScaler()
scaler.fit(arr_signal)
arr_signal_nor = scaler.transform(arr_signal) # version normalisé
Expliquer ce que fait la fonction.
Il calcule le maximum et le minimum de chaque caractéristique (feature) et les utilise ensuite pour mettre à l'échelle toutes les valeurs de cette caractéristique avec celles de [0,1].
Representer graphiquement avec plot
quelques courbes de resultat, cela vous parait il correct? Ce resultat vous parait-il adequat?
plot_signal(arr_signal_nor, name="signal normalisé") # plots 4 signaux
Ils ne sont pas les mêmes qu'à l'origine mais c'est raisonnable car nous avons fait une échelle de valeurs à celles de [0,1]. Par conséquent, les courbes semblent plus étroites (plus bruyantes).
Appliquer la fonction écrite ci-dessous à notre set de données. Ploter quelques resultats.
def global_min_max_rescale(X):
global_min = np.min(X)
global_max = np.max(X)
return (X-global_min)/(global_max-global_min)
arr_signal_global = global_min_max_rescale(arr_signal)
plot_signal(arr_signal_global, name="signal normalisé global") # plots 4 signaux
Expliquer la différence avec la méthode précédente
Dans ce cas, nous calculons min et max de toutes les valeurs dans toutes les features (alors que dans le cas de MinMaxScaler
, nous prenons seulement min et max des valeurs dans chaque feature). C'est pourquoi dans le cas de l'utilisation de global_min_max_rescale
, on obtient la même forme de plots que ceux qui sont originaux.
OPTIONNEL : Vous pouvez aussi tester d'autres méthodes de scaling, fournies ou non pas sklearn
Nous utilisonsStandardScaler
.
scaler = StandardScaler()
scaler.fit(arr_signal)
arr_signal_std = scaler.transform(arr_signal)
plot_signal(arr_signal_std, name="signal std")
Récupérer les vecteurs amplitudes et fréquences associées en appliquant une FFT sur le signal normé avec la seconde méthode (on considère que la fréquence d'échantillonnage vaut 100 Hz)
aidez vous du module fft
de numpy
arr_signal_global_fft = np.fft.fft(arr_signal_global)
arr_signal_global_fft.shape
# les vecteurs amplitudes
arr_amplitudes = np.abs(arr_signal_global_fft)
arr_amplitudes.shape
# les vecteurs fréquences
arr_freq = np.fft.fftfreq(len(arr_signal_global[0]), d=0.01)
arr_freq
Faire un plot du spectre FFT.
plt.figure(figsize=(18, 10), dpi= 60)
sp_idx = 1
for i in range(4):
plt.subplot(2, 2, sp_idx)
plt.bar(arr_freq, arr_amplitudes[i], width=1.5)
plt.title('spectre ' + str(lst_idx[i]), fontsize=12)
sp_idx += 1
plt.ylabel("Amplitude")
plt.xlabel("Frequency [Hz]")
# plt.show()
plt.tight_layout() # ajuster l'espacement entre les sous-plots pour minimiser les chevauchements
OPTIONNEL : vous pouvez proposer d'autres features pour le traitement du signal
votre réponse ici
On cherche maintenant à reduire le nombre de dimension de feature afin d'améliorer les futures procédures d'apprentissage. Proposer au moins 2 méthodes, implémenter au moins 1.
from sklearn.decomposition import PCA
pca = PCA(n_components=100, whiten=False, random_state=42)
arr_signal_reduce = pca.fit_transform(arr_signal)
print('arr_signal_reduce.shape = ', arr_signal_reduce.shape)
print("variance retained: ", round(np.cumsum(pca.explained_variance_ratio_)[-1]*100), '%')
plot_signal(arr_signal_reduce) # plots 4 signaux
Nous allons maintenant chercher à détecter les signaux anormaux du dataset(et oui il y a en a). Ceci correspond à un apprentissage non supervisé. Nous vous conseillons d'explorer le lien ci-dessous.
http://scikit-learn.org/stable/modules/outlier_detection.html Si vous êtes perdu ou ne savez pas par quoi commencer, nous recommandons d'essayer OneClassSVM ou IsolationForest.
Si vous souhaitez proposer et illustrer une autre méthode, n'hésitez pas et faites vous plaisir.
from sklearn.svm import OneClassSVM
from sklearn.ensemble import IsolationForest
Appliquer les differents outils d'outlier detection aux differentes données normées (brut ou fft avec ou sans reduction de dimensions)
# La fonction pour trouver les signaux anormaux en utilisant la méthode OneClassSVM.
def one_class_signal(X_signal, arg_nu=0.5, arg_kernel='rbf', arg_gamma='auto', arg_degree=3, custom_text=""):
cls = OneClassSVM(nu=arg_nu, kernel=arg_kernel, gamma=arg_gamma, degree=arg_degree)
cls.fit(X_signal)
pred = cls.predict(X_signal)
unique, counts = np.unique(pred, return_counts=True)
if unique.shape[0]>1:
print("Inliers/Outliers (1/-1):", counts[1], "/", counts[0], custom_text)
else: # Il n'y a pas d'anormalité ou il y en a toutes
if unique[0]==-1:
print("Inliers/Outliers (1/-1): 0 /", counts[0], custom_text)
else:
print("Inliers/Outliers (1/-1):", counts[0], "/ 0", custom_text)
idx_signal_anormal = np.where(pred==-1)[0].tolist() # liste des indeces des signaux anormaux
if len(idx_signal_anormal)>0:
return [lst_idx[i] for i in idx_signal_anormal] # liste de signaux anormaux
else:
print('Il n\'y pas de signal anormal!')
D'abord, on veut essayer avec des noyaux différents.
## One-Class SVM aux data bruts
# kernel=poly
lst_sgn_anrml_brut = one_class_signal(arr_signal, arg_nu=0.1,
arg_kernel="poly", arg_degree=5,
arg_gamma='auto', custom_text="(kernel=poly)");
# kernel=sigmoid
one_class_signal(arr_signal, arg_nu=0.1, arg_kernel="sigmoid", arg_gamma='auto', custom_text="(kernel=sigmoid)");
# kernel=rbf
one_class_signal(arr_signal, arg_nu=0.1, arg_kernel="rbf", arg_gamma='auto', custom_text="(kernel=rbf)");
## One-Class SVM aux data avec reduction de dimensions
# kernel=poly
one_class_signal(arr_signal_reduce, arg_nu=0.1, arg_kernel="poly", arg_degree=8, custom_text="(kernel=poly)")
# kernel=rbf
lst_sgn_anrml_reduce = one_class_signal(arr_signal_reduce, arg_nu=0.1,
arg_kernel="rbf", arg_gamma=0.0001, custom_text="(kernel=rbf)");
## One-Class SVM aux data normalisés (MinMaxScaler)
# kernel=poly
one_class_signal(arr_signal_nor, arg_nu=0.1, arg_kernel="poly", arg_degree=4, custom_text="(kernel=poly)")
# kernel=rbf
lst_sgn_anrml_nor = one_class_signal(arr_signal_nor, arg_nu=0.1,
arg_kernel="rbf", arg_gamma=0.01, custom_text="(kernel=rbf)");
Proposer une liste de signaux anormaux
print("Data Brut: ", lst_sgn_anrml_brut)
print("Data reducé: ", lst_sgn_anrml_reduce)
print("Data normalisé: ", lst_sgn_anrml_nor)
La liste des signaux anormaux peut être la suivante:
lst_signaux_anormaux = ['signal_' + str(i) for i in lst_sgn_anrml_reduce if i in lst_sgn_anrml_nor]
lst_signaux_anormaux
Nous allons faire quelques plots pour comparer les signaux "normaux" avec nos signaux anormaux trouvés pour voir les différences.
plt.figure(figsize=(18, 4), dpi= 60)
# des signaux normaux
plt.subplot(1, 3, 1)
plt.plot(arr_signal[2], c='r')
plt.plot(arr_signal[3], c='b')
plt.plot(arr_signal[4], c='g')
plt.legend(['normal', 'normal', 'normal'])
plt.title('Des signaux normaux.')
# des signaux anormaux (dans lst_signaux_anormaux)
plt.subplot(1, 3, 2)
plt.plot(arr_signal[58], c='r')
plt.plot(arr_signal[63], c='b')
plt.plot(arr_signal[73], c='g')
plt.legend(['anormal', 'anormal', 'anormal'])
plt.title('Des signaux anormaux.')
# des signaux anormaux en comparaison avec un signal normal
plt.subplot(1, 3, 3)
plt.plot(arr_signal[2], c='r')
plt.plot(arr_signal[63], c='b')
plt.plot(arr_signal[73], c='g')
plt.legend(['normal', 'anormal', 'anormal'])
plt.title('Des signaux anormaux en comparaison avec un signal normal.')
plt.tight_layout() # ajuster l'espacement entre les sous-plots pour minimiser les chevauchements
Il existe une méthode de compression de données en Deep learning qui permet d'extraire des données non lineaires. Ci-dessous des exemples de codes écrits en keras.
https://blog.keras.io/building-autoencoders-in-keras.html
Il peut être intéressant pour du traitement du signal d'utiliser une convolution 1D.
Proposer un autoencoder appliqué à nos données
import tensorflow as tf
tf.keras.backend.set_floatx('float64')
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras.models import Model
from sklearn.model_selection import train_test_split
IDÉE: Nous entraînerons les données pour obtenir leurs reconstructions et les comparerons aux données originales. Dans 100 échantillons, nous choisissons 10 échantillons les plus différents pour être des outliers. Nous travaillerons sur la version normalisée des signaux (arr_signal_nor
).
encoding_dim = 30
input_dim = arr_signal_nor.shape[1] # 200 features
# this is our input placeholder
input_arr = Input(shape=(input_dim,))
encoded = Dense(encoding_dim, activation='relu')(input_arr)
decoded = Dense(input_dim, activation='sigmoid')(encoded)
# this model maps an input to its reconstruction
autoencoder = Model(inputs=input_arr, outputs=decoded)
# autoencoder.summary()
autoencoder.compile(optimizer='adam', loss='mse')
nb_epoch = 100
batch_size = 50
size_test = 0.1
arr_train, arr_test = train_test_split(arr_signal_nor, test_size=size_test)
autoencoder.fit(arr_train, arr_train,
epochs=nb_epoch,
batch_size=batch_size,
shuffle=True,
validation_data=(arr_test, arr_test),
verbose=0
)
Proposer une methode de detection d'anomalie basée sur un autoencoder
arr_predict = autoencoder.predict(arr_signal_nor)
# normaliser arr_predict
scaler = MinMaxScaler()
scaler.fit(arr_predict)
arr_predict_nor = scaler.transform(arr_predict) # version normalisé
differeces = np.linalg.norm(arr_signal_nor - arr_predict_nor, axis=-1)
res = sorted(range(len(differeces)), key = lambda sub: differeces[sub])[-10:]
lst_signaux_anormaux = [lst_idx[i] for i in res]
print("Liste d'index des 10 signaux anormaux. : " + str(lst_signaux_anormaux))
plt.scatter(lst_idx, differeces)
plt.scatter(lst_signaux_anormaux, differeces[res], c='r')
for i, txt in enumerate(lst_signaux_anormaux):
plt.annotate(txt, (lst_signaux_anormaux[i]-0.1, differeces[res[i]]+0.05))
plt.ylabel("distance")
plt.xlabel("l'index de signaux")
plt.legend(['signaux normaux', 'signaux anormaux'], loc='upper left')
Je n'ai pas assez de temps pour faire cette tâche.