Dataswati's test

Imports

In [1]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
In [2]:
import os
In [3]:
from sklearn.preprocessing import StandardScaler, MinMaxScaler
In [4]:
DATADIR = "wave_data/"

Introduction

Vous trouverez dans le répertoire wave_data/ des signaux qui ont été générés pour l'exercice, nous allons vous demander de faire quelques manipulations afin de lire, transformer, analyser, etc.

Le but de cet exercice est simplement de valider que vous êtes en mesure de prendre rapidement en main python si vous devez rejoindre l'équipe. Il s'agit principalement de faire appel à des fonctions et des modules que nous utilisons régulièrement.

Si vous n'avez jamais fait de python, vous pouvez facilement trouver des éléments de réponses sur internet, et écrire du code fonctionnel.

Une fois le notebook rempli, il ne vous reste plus qu'à nous le renvoyer par email à l'adresse : [email protected].

import et visualisation du code

Ouvrir un des fichiers du répertoire wave_data à l'aide de numpy via la fonction genfromtxt

In [5]:
lst_signal = []
dirs = os.listdir(DATADIR)
for file in dirs:
    lst_signal.append(np.genfromtxt(DATADIR + file))

len(lst_signal)
Out[5]:
100

Ouvrir et regrouper l'ensemble des signaux dans le répertoire wave_data/ dans une liste regroupant des numpy.array.

Former une seconde liste de même taille, contenant le numéro du fichier associé au signal dans la première liste à la même position, au format entier (le numéro étant x dans signal_x.txt).

Bien évidement les listes devront correspondre, nous conseillons l'utilisation d'une boucle for de os.listdir et des méthode associées au format str tel que str.split.

In [93]:
lst_idx = []
for file in dirs:
    idx = file.split('.')[0].split('_')[1]
    lst_idx.append(idx)

lst_idx = list(map(int, lst_idx))
len(lst_idx)
Out[93]:
100

Afficher quelques plots des signaux en utilisant matplotlib (plt.plot)
Afficher l'ensemble des signaux avec plt.imshow

In [94]:
def plot_signal(arr, num_plots=4, name='signal'):
    plt.figure(figsize=(18, 10), dpi= 60)
    sp_idx = 1
    for i in range(num_plots//2 * 2):
        plt.subplot(num_plots//2, 2, sp_idx)
        plt.plot(arr[i])
        plt.title(name + ' ' + str(lst_idx[i]), fontsize=12)
        sp_idx += 1
    plt.tight_layout() # adjust spacing between subplots to minimize the overlaps
In [95]:
plot_signal(lst_signal) # plots 4 signaux

Pretraitement et featuring

Apres avoir converti la liste en un array de taille (nb_sample, nb_feature), normaliser les signaux en utilisant MinMaxScaler de scikit-learn

In [9]:
arr_signal = np.asarray(lst_signal)
arr_signal.shape # n_sanples x n_features
Out[9]:
(100, 200)
In [10]:
scaler = MinMaxScaler()
scaler.fit(arr_signal)
arr_signal_nor = scaler.transform(arr_signal) # version normalisé

Expliquer ce que fait la fonction.

Il calcule le maximum et le minimum de chaque caractéristique (feature) et les utilise ensuite pour mettre à l'échelle toutes les valeurs de cette caractéristique avec celles de [0,1].

Representer graphiquement avec plot quelques courbes de resultat, cela vous parait il correct? Ce resultat vous parait-il adequat?

In [96]:
plot_signal(arr_signal_nor, name="signal normalisé") # plots 4 signaux

Ils ne sont pas les mêmes qu'à l'origine mais c'est raisonnable car nous avons fait une échelle de valeurs à celles de [0,1]. Par conséquent, les courbes semblent plus étroites (plus bruyantes).

Appliquer la fonction écrite ci-dessous à notre set de données. Ploter quelques resultats.

In [12]:
def global_min_max_rescale(X):
    global_min = np.min(X)
    global_max = np.max(X)
    return (X-global_min)/(global_max-global_min)
In [13]:
arr_signal_global = global_min_max_rescale(arr_signal)
plot_signal(arr_signal_global, name="signal normalisé global") # plots 4 signaux

Expliquer la différence avec la méthode précédente

Dans ce cas, nous calculons min et max de toutes les valeurs dans toutes les features (alors que dans le cas de MinMaxScaler, nous prenons seulement min et max des valeurs dans chaque feature). C'est pourquoi dans le cas de l'utilisation de global_min_max_rescale, on obtient la même forme de plots que ceux qui sont originaux.

OPTIONNEL : Vous pouvez aussi tester d'autres méthodes de scaling, fournies ou non pas sklearn

Nous utilisonsStandardScaler.

In [14]:
scaler = StandardScaler()
scaler.fit(arr_signal)
arr_signal_std = scaler.transform(arr_signal)
plot_signal(arr_signal_std, name="signal std")

Récupérer les vecteurs amplitudes et fréquences associées en appliquant une FFT sur le signal normé avec la seconde méthode (on considère que la fréquence d'échantillonnage vaut 100 Hz)

aidez vous du module fft de numpy

In [15]:
arr_signal_global_fft = np.fft.fft(arr_signal_global)
arr_signal_global_fft.shape
Out[15]:
(100, 200)
In [16]:
# les vecteurs amplitudes
arr_amplitudes = np.abs(arr_signal_global_fft)
arr_amplitudes.shape
Out[16]:
(100, 200)
In [17]:
# les vecteurs fréquences
arr_freq = np.fft.fftfreq(len(arr_signal_global[0]), d=0.01)
arr_freq
Out[17]:
array([  0. ,   0.5,   1. ,   1.5,   2. ,   2.5,   3. ,   3.5,   4. ,
         4.5,   5. ,   5.5,   6. ,   6.5,   7. ,   7.5,   8. ,   8.5,
         9. ,   9.5,  10. ,  10.5,  11. ,  11.5,  12. ,  12.5,  13. ,
        13.5,  14. ,  14.5,  15. ,  15.5,  16. ,  16.5,  17. ,  17.5,
        18. ,  18.5,  19. ,  19.5,  20. ,  20.5,  21. ,  21.5,  22. ,
        22.5,  23. ,  23.5,  24. ,  24.5,  25. ,  25.5,  26. ,  26.5,
        27. ,  27.5,  28. ,  28.5,  29. ,  29.5,  30. ,  30.5,  31. ,
        31.5,  32. ,  32.5,  33. ,  33.5,  34. ,  34.5,  35. ,  35.5,
        36. ,  36.5,  37. ,  37.5,  38. ,  38.5,  39. ,  39.5,  40. ,
        40.5,  41. ,  41.5,  42. ,  42.5,  43. ,  43.5,  44. ,  44.5,
        45. ,  45.5,  46. ,  46.5,  47. ,  47.5,  48. ,  48.5,  49. ,
        49.5, -50. , -49.5, -49. , -48.5, -48. , -47.5, -47. , -46.5,
       -46. , -45.5, -45. , -44.5, -44. , -43.5, -43. , -42.5, -42. ,
       -41.5, -41. , -40.5, -40. , -39.5, -39. , -38.5, -38. , -37.5,
       -37. , -36.5, -36. , -35.5, -35. , -34.5, -34. , -33.5, -33. ,
       -32.5, -32. , -31.5, -31. , -30.5, -30. , -29.5, -29. , -28.5,
       -28. , -27.5, -27. , -26.5, -26. , -25.5, -25. , -24.5, -24. ,
       -23.5, -23. , -22.5, -22. , -21.5, -21. , -20.5, -20. , -19.5,
       -19. , -18.5, -18. , -17.5, -17. , -16.5, -16. , -15.5, -15. ,
       -14.5, -14. , -13.5, -13. , -12.5, -12. , -11.5, -11. , -10.5,
       -10. ,  -9.5,  -9. ,  -8.5,  -8. ,  -7.5,  -7. ,  -6.5,  -6. ,
        -5.5,  -5. ,  -4.5,  -4. ,  -3.5,  -3. ,  -2.5,  -2. ,  -1.5,
        -1. ,  -0.5])

Faire un plot du spectre FFT.

In [18]:
plt.figure(figsize=(18, 10), dpi= 60)
sp_idx = 1
for i in range(4):
    plt.subplot(2, 2, sp_idx)
    plt.bar(arr_freq, arr_amplitudes[i], width=1.5)
    plt.title('spectre ' + str(lst_idx[i]), fontsize=12)
    sp_idx += 1
    plt.ylabel("Amplitude")
    plt.xlabel("Frequency [Hz]")

# plt.show()
plt.tight_layout() # ajuster l'espacement entre les sous-plots pour minimiser les chevauchements

OPTIONNEL : vous pouvez proposer d'autres features pour le traitement du signal

votre réponse ici

Reduction de dimension

On cherche maintenant à reduire le nombre de dimension de feature afin d'améliorer les futures procédures d'apprentissage. Proposer au moins 2 méthodes, implémenter au moins 1.

In [19]:
from sklearn.decomposition import PCA
In [20]:
pca = PCA(n_components=100, whiten=False, random_state=42)
arr_signal_reduce = pca.fit_transform(arr_signal)

print('arr_signal_reduce.shape = ', arr_signal_reduce.shape)
print("variance retained: ", round(np.cumsum(pca.explained_variance_ratio_)[-1]*100), '%')
arr_signal_reduce.shape =  (100, 100)
variance retained:  100.0 %
In [21]:
plot_signal(arr_signal_reduce) # plots 4 signaux

Detection d'anomalie

Nous allons maintenant chercher à détecter les signaux anormaux du dataset(et oui il y a en a). Ceci correspond à un apprentissage non supervisé. Nous vous conseillons d'explorer le lien ci-dessous.

http://scikit-learn.org/stable/modules/outlier_detection.html Si vous êtes perdu ou ne savez pas par quoi commencer, nous recommandons d'essayer OneClassSVM ou IsolationForest.

Si vous souhaitez proposer et illustrer une autre méthode, n'hésitez pas et faites vous plaisir.

In [22]:
from sklearn.svm import OneClassSVM
from sklearn.ensemble import IsolationForest

Appliquer les differents outils d'outlier detection aux differentes données normées (brut ou fft avec ou sans reduction de dimensions)

In [97]:
# La fonction pour trouver les signaux anormaux en utilisant la méthode OneClassSVM. 
def one_class_signal(X_signal, arg_nu=0.5, arg_kernel='rbf', arg_gamma='auto', arg_degree=3, custom_text=""):
    cls = OneClassSVM(nu=arg_nu, kernel=arg_kernel, gamma=arg_gamma, degree=arg_degree)
    cls.fit(X_signal)
    pred = cls.predict(X_signal)
    unique, counts = np.unique(pred, return_counts=True)
    if unique.shape[0]>1:
        print("Inliers/Outliers (1/-1):", counts[1], "/", counts[0], custom_text)
    else: # Il n'y a pas d'anormalité ou il y en a toutes
        if unique[0]==-1:
            print("Inliers/Outliers (1/-1): 0 /", counts[0], custom_text)
        else:
            print("Inliers/Outliers (1/-1):", counts[0], "/ 0", custom_text)
    idx_signal_anormal = np.where(pred==-1)[0].tolist() # liste des indeces des signaux anormaux
    if len(idx_signal_anormal)>0:
        return [lst_idx[i] for i in idx_signal_anormal]  # liste de signaux anormaux
    else:
        print('Il n\'y pas de signal anormal!')

D'abord, on veut essayer avec des noyaux différents.

In [98]:
## One-Class SVM aux data bruts 

# kernel=poly
lst_sgn_anrml_brut = one_class_signal(arr_signal, arg_nu=0.1, 
                                      arg_kernel="poly", arg_degree=5, 
                                      arg_gamma='auto', custom_text="(kernel=poly)");

# kernel=sigmoid
one_class_signal(arr_signal, arg_nu=0.1, arg_kernel="sigmoid", arg_gamma='auto', custom_text="(kernel=sigmoid)");

# kernel=rbf
one_class_signal(arr_signal, arg_nu=0.1, arg_kernel="rbf", arg_gamma='auto', custom_text="(kernel=rbf)");
Inliers/Outliers (1/-1): 90 / 10 (kernel=poly)
Inliers/Outliers (1/-1): 0 / 100 (kernel=sigmoid)
Inliers/Outliers (1/-1): 40 / 60 (kernel=rbf)
In [99]:
## One-Class SVM aux data avec reduction de dimensions 

# kernel=poly
one_class_signal(arr_signal_reduce, arg_nu=0.1, arg_kernel="poly", arg_degree=8, custom_text="(kernel=poly)")

# kernel=rbf
lst_sgn_anrml_reduce = one_class_signal(arr_signal_reduce, arg_nu=0.1, 
                                        arg_kernel="rbf", arg_gamma=0.0001, custom_text="(kernel=rbf)");
Inliers/Outliers (1/-1): 53 / 47 (kernel=poly)
Inliers/Outliers (1/-1): 88 / 12 (kernel=rbf)
In [100]:
## One-Class SVM aux data normalisés (MinMaxScaler)

# kernel=poly
one_class_signal(arr_signal_nor, arg_nu=0.1, arg_kernel="poly", arg_degree=4, custom_text="(kernel=poly)")

# kernel=rbf
lst_sgn_anrml_nor = one_class_signal(arr_signal_nor, arg_nu=0.1, 
                                     arg_kernel="rbf", arg_gamma=0.01, custom_text="(kernel=rbf)");
Inliers/Outliers (1/-1): 85 / 15 (kernel=poly)
Inliers/Outliers (1/-1): 90 / 10 (kernel=rbf)

Proposer une liste de signaux anormaux

In [101]:
print("Data Brut: ", lst_sgn_anrml_brut)
print("Data reducé: ", lst_sgn_anrml_reduce)
print("Data normalisé: ", lst_sgn_anrml_nor)
Data Brut:  [1, 11, 46, 6, 71, 72, 80, 83, 88, 93]
Data reducé:  [53, 55, 58, 60, 63, 64, 66, 67, 68, 69, 70, 73]
Data normalisé:  [51, 58, 61, 62, 63, 64, 66, 67, 70, 73]

La liste des signaux anormaux peut être la suivante:

In [103]:
lst_signaux_anormaux = ['signal_' + str(i) for i in lst_sgn_anrml_reduce if i in lst_sgn_anrml_nor]
lst_signaux_anormaux
Out[103]:
['signal_58',
 'signal_63',
 'signal_64',
 'signal_66',
 'signal_67',
 'signal_70',
 'signal_73']

Nous allons faire quelques plots pour comparer les signaux "normaux" avec nos signaux anormaux trouvés pour voir les différences.

In [29]:
plt.figure(figsize=(18, 4), dpi= 60)

# des signaux normaux
plt.subplot(1, 3, 1)
plt.plot(arr_signal[2], c='r')
plt.plot(arr_signal[3], c='b')
plt.plot(arr_signal[4], c='g')
plt.legend(['normal', 'normal', 'normal'])
plt.title('Des signaux normaux.')

# des signaux anormaux (dans lst_signaux_anormaux)
plt.subplot(1, 3, 2)
plt.plot(arr_signal[58], c='r')
plt.plot(arr_signal[63], c='b')
plt.plot(arr_signal[73], c='g')
plt.legend(['anormal', 'anormal', 'anormal'])
plt.title('Des signaux anormaux.')

# des signaux anormaux en comparaison avec un signal normal
plt.subplot(1, 3, 3)
plt.plot(arr_signal[2], c='r')
plt.plot(arr_signal[63], c='b')
plt.plot(arr_signal[73], c='g')
plt.legend(['normal', 'anormal', 'anormal'])
plt.title('Des signaux anormaux en comparaison avec un signal normal.')

plt.tight_layout() # ajuster l'espacement entre les sous-plots pour minimiser les chevauchements

Autoencoder

Il existe une méthode de compression de données en Deep learning qui permet d'extraire des données non lineaires. Ci-dessous des exemples de codes écrits en keras.

https://blog.keras.io/building-autoencoders-in-keras.html

Il peut être intéressant pour du traitement du signal d'utiliser une convolution 1D.

https://machinelearningmastery.com/cnn-models-for-human-activity-recognition-time-series-classification/

Proposer un autoencoder appliqué à nos données

In [30]:
import tensorflow as tf
tf.keras.backend.set_floatx('float64')

from tensorflow.keras.layers import Dense, Input
from tensorflow.keras.models import Model

from sklearn.model_selection import train_test_split

IDÉE: Nous entraînerons les données pour obtenir leurs reconstructions et les comparerons aux données originales. Dans 100 échantillons, nous choisissons 10 échantillons les plus différents pour être des outliers. Nous travaillerons sur la version normalisée des signaux (arr_signal_nor).

In [70]:
encoding_dim = 30
input_dim = arr_signal_nor.shape[1] # 200 features

# this is our input placeholder
input_arr = Input(shape=(input_dim,))

encoded = Dense(encoding_dim, activation='relu')(input_arr)
decoded = Dense(input_dim, activation='sigmoid')(encoded)

# this model maps an input to its reconstruction
autoencoder = Model(inputs=input_arr, outputs=decoded)
# autoencoder.summary()

autoencoder.compile(optimizer='adam', loss='mse')

nb_epoch = 100
batch_size = 50
size_test = 0.1
arr_train, arr_test = train_test_split(arr_signal_nor, test_size=size_test)

autoencoder.fit(arr_train, arr_train,
                epochs=nb_epoch,
                batch_size=batch_size,
                shuffle=True,
                validation_data=(arr_test, arr_test),
                verbose=0
                )
Out[70]:
<tensorflow.python.keras.callbacks.History at 0x1bbf47953c8>

Proposer une methode de detection d'anomalie basée sur un autoencoder

In [71]:
arr_predict = autoencoder.predict(arr_signal_nor)
In [72]:
# normaliser arr_predict
scaler = MinMaxScaler()
scaler.fit(arr_predict)
arr_predict_nor = scaler.transform(arr_predict) # version normalisé
In [106]:
differeces = np.linalg.norm(arr_signal_nor - arr_predict_nor, axis=-1)
res = sorted(range(len(differeces)), key = lambda sub: differeces[sub])[-10:]
lst_signaux_anormaux = [lst_idx[i] for i in res]
print("Liste d'index des 10 signaux anormaux. : " + str(lst_signaux_anormaux))
Liste d'index des 10 signaux anormaux. : [74, 62, 60, 65, 51, 63, 56, 52, 64, 68]
In [119]:
plt.scatter(lst_idx, differeces)
plt.scatter(lst_signaux_anormaux, differeces[res], c='r')

for i, txt in enumerate(lst_signaux_anormaux):
    plt.annotate(txt, (lst_signaux_anormaux[i]-0.1, differeces[res[i]]+0.05))

plt.ylabel("distance")
plt.xlabel("l'index de signaux")
plt.legend(['signaux normaux', 'signaux anormaux'], loc='upper left')
Out[119]:
<matplotlib.legend.Legend at 0x1bbf6ae7a58>

Vous pouvez proposer et tester ici d'autres approches pour traiter le même problème initial

Je n'ai pas assez de temps pour faire cette tâche.