共计 8221 个字符,预计需要花费 21 分钟才能阅读完成。
人体流动辨认(HAR)是一种应用人工智能(AI)从智能手表等流动记录设施产生的原始数据中辨认人类流动的办法。当人们执行某种动作时,人们佩戴的传感器(智能手表、手环、专用设备等)就会产生信号。这些收集信息的传感器包含加速度计、陀螺仪和磁力计。人类流动辨认有各种各样的利用,从为病人和残疾人提供帮忙到像游戏这样重大依赖于剖析静止技能的畛域。咱们能够将这些人类流动辨认技术大抵分为两类: 固定传感器和挪动传感器。在本文中,咱们应用挪动传感器产生的原始数据来辨认人类流动。
在本文中,我将应用 LSTM (Long – term Memory) 和 CNN (Convolutional Neural Network) 来辨认上面的人类流动:
- 下楼
- 上楼
- 跑步
- 坐着
- 站立
- 步行
概述
你可能会思考为什么咱们要应用 LSTM-CNN 模型而不是根本的机器学习办法?
机器学习办法在很大水平上依赖于启发式手动特征提取人类流动辨认工作,而咱们这里须要做的是端到端的学习,简化了启发式手动提取特色的操作。
我将要应用的模型是一个深神经网络,该网络是 LSTM 和 CNN 的组合造成的,并且具备提取流动特色和仅应用模型参数进行分类的能力。
这里咱们应用 WISDM 数据集,总计 1.098.209 样本。通过咱们的训练,模型的 F1 得分为 0.96,在测试集上,F1 得分为 0.89。
导入库
首先,咱们将导入咱们将须要的所有必要库。
from pandas import read_csv, unique | |
import numpy as np | |
from scipy.interpolate import interp1d | |
from scipy.stats import mode | |
from sklearn.preprocessing import LabelEncoder | |
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay | |
from tensorflow import stack | |
from tensorflow.keras.utils import to_categorical | |
from keras.models import Sequential | |
from keras.layers import Dense, GlobalAveragePooling1D, BatchNormalization, MaxPool1D, Reshape, Activation | |
from keras.layers import Conv1D, LSTM | |
from keras.callbacks import ModelCheckpoint, EarlyStopping | |
import matplotlib.pyplot as plt | |
%matplotlib inline | |
import warnings | |
warnings.filterwarnings("ignore") |
咱们将应用 Sklearn,Tensorflow,Keras,Scipy 和 Numpy 来构建模型和进行数据预处理。应用 PANDAS 进行数据加载,应用 matplotlib 进行数据可视化。
数据集加载和可视化
WISDM 是由集体腰间携带的挪动设施上的减速计记录下来。该数据收集是由集体监督的能够确保数据的品质。咱们将应用的文件是 WISDM_AR_V1.1_RAW.TXT。应用 PANDAS,能够将数据集加载到 DataAframe 中,如上面代码:
def read_data(filepath): | |
df = read_csv(filepath, header=None, names=['user-id', | |
'activity', | |
'timestamp', | |
'X', | |
'Y', | |
'Z']) | |
## removing ';' from last column and converting it to float | |
df['Z'].replace(regex=True, inplace=True, to_replace=r';', value=r'') | |
df['Z'] = df['Z'].apply(convert_to_float) | |
return df | |
def convert_to_float(x): | |
try: | |
return np.float64(x) | |
except: | |
return np.nan | |
df = read_data('Dataset/WISDM_ar_v1.1/WISDM_ar_v1.1_raw.txt') | |
df |
plt.figure(figsize=(15, 5)) | |
plt.xlabel('Activity Type') | |
plt.ylabel('Training examples') | |
df['activity'].value_counts().plot(kind='bar', | |
title='Training examples by Activity Types') | |
plt.show() | |
plt.figure(figsize=(15, 5)) | |
plt.xlabel('User') | |
plt.ylabel('Training examples') | |
df['user-id'].value_counts().plot(kind='bar', | |
title='Training examples by user') | |
plt.show() |
当初我将收集的三个轴上的加速度计数据进行可视化。
def axis_plot(ax, x, y, title): | |
ax.plot(x, y, 'r') | |
ax.set_title(title) | |
ax.xaxis.set_visible(False) | |
ax.set_ylim([min(y) - np.std(y), max(y) + np.std(y)]) | |
ax.set_xlim([min(x), max(x)]) | |
ax.grid(True) | |
for activity in df['activity'].unique(): | |
limit = df[df['activity'] == activity][:180] | |
fig, (ax0, ax1, ax2) = plt.subplots(nrows=3, sharex=True, figsize=(15, 10)) | |
axis_plot(ax0, limit['timestamp'], limit['X'], 'x-axis') | |
axis_plot(ax1, limit['timestamp'], limit['Y'], 'y-axis') | |
axis_plot(ax2, limit['timestamp'], limit['Z'], 'z-axis') | |
plt.subplots_adjust(hspace=0.2) | |
fig.suptitle(activity) | |
plt.subplots_adjust(top=0.9) | |
plt.show() |
数据预处理
数据预处理是一项十分重要的工作,它使咱们的模型可能更好的利用咱们的原始数据。这里将应用的数据预处理办法有:
- 标签编码
- 线性插值
- 数据宰割
- 归一化
- 工夫序列宰割
- 独热编码
标签编码
因为模型不能承受非数字标签作为输出,咱们将在另一列中增加 ’ activity ‘ 列的编码标签,并将其命名为 ’ activityEncode ‘。标签被转换成如下所示的数字标签 (这个标签是咱们要预测的后果标签)
- Downstairs [0]
- Jogging [1]
- Sitting [2]
- Standing [3]
- Upstairs [4]
- Walking [5]
label_encode = LabelEncoder() | |
df['activityEncode'] = label_encode.fit_transform(df['activity'].values.ravel()) | |
df |
线性插值
利用线性插值能够防止采集过程中呈现 NaN 的数据失落的问题。它将通过插值法填充缺失的值。尽管在这个数据集中只有一个 NaN 值,但为了咱们的展现,还是须要实现它。
interpolation_fn = interp1d(df['activityEncode'] ,df['Z'], kind='linear') | |
null_list = df[df['Z'].isnull()].index.tolist() | |
for i in null_list: | |
y = df['activityEncode'][i] | |
value = interpolation_fn(y) | |
df['Z']=df['Z'].fillna(value) | |
print(value) |
数据宰割
依据用户 id 进行数据宰割,防止数据宰割谬误。咱们在训练集中应用 id 小于或等于 27 的用户,其余的在测试集中应用。
df_test = df[df['user-id'] > 27] | |
df_train = df[df['user-id'] <= 27] |
归一化
在训练之前,须要将数据特色归一化到 0 到 1 的范畴内。咱们用的办法是
df_train['X'] = (df_train['X']-df_train['X'].min())/(df_train['X'].max()-df_train['X'].min()) | |
df_train['Y'] = (df_train['Y']-df_train['Y'].min())/(df_train['Y'].max()-df_train['Y'].min()) | |
df_train['Z'] = (df_train['Z']-df_train['Z'].min())/(df_train['Z'].max()-df_train['Z'].min()) | |
df_train |
工夫序列宰割
因为咱们解决的是工夫序列数据,所以须要创立一个宰割的函数,标签名称和每个记录的范畴进行分段。此函数在 x_train 和 y_train 中执行特色的拆散,将每 80 个时间段分成一组数据。
def segments(df, time_steps, step, label_name): | |
N_FEATURES = 3 | |
segments = [] | |
labels = [] | |
for i in range(0, len(df) - time_steps, step): | |
xs = df['X'].values[i:i+time_steps] | |
ys = df['Y'].values[i:i+time_steps] | |
zs = df['Z'].values[i:i+time_steps] | |
label = mode(df[label_name][i:i+time_steps])[0][0] | |
segments.append([xs, ys, zs]) | |
labels.append(label) | |
reshaped_segments = np.asarray(segments, dtype=np.float32).reshape(-1, time_steps, N_FEATURES) | |
labels = np.asarray(labels) | |
return reshaped_segments, labels | |
TIME_PERIOD = 80 | |
STEP_DISTANCE = 40 | |
LABEL = 'activityEncode' | |
x_train, y_train = segments(df_train, TIME_PERIOD, STEP_DISTANCE, LABEL) |
这样,x_train 和 y_train 形态变为:
print('x_train shape:', x_train.shape) | |
print('Training samples:', x_train.shape[0]) | |
print('y_train shape:', y_train.shape) | |
x_train shape: (20334, 80, 3) | |
Training samples: 20334 | |
y_train shape: (20334,) |
这里还存储了一些前面用到的数据:时间段(time_period),传感器数(sensors)和类(num_classes)的数量。
time_period, sensors = x_train.shape[1], x_train.shape[2] | |
num_classes = label_encode.classes_.size | |
print(list(label_encode.classes_)) | |
['Downstairs', 'Jogging', 'Sitting', 'Standing', 'Upstairs', 'Walking'] |
最初须要应用 Reshape 将其转换为列表, 作为 keras 的输出
input_shape = time_period * sensors | |
x_train = x_train.reshape(x_train.shape[0], input_shape) | |
print("Input Shape:", input_shape) | |
print("Input Data Shape:", x_train.shape) | |
Input Shape: 240 | |
Input Data Shape: (20334, 240) |
最初须要将所有数据转换为 float32。
x_train = x_train.astype('float32') | |
y_train = y_train.astype('float32') |
独热编码
这是数据预处理的最初一步,咱们将通过编码标签并将其存储到 y_train_hot 中来执行。
y_train_hot = to_categorical(y_train, num_classes) | |
print("y_train shape:", y_train_hot.shape) | |
y_train shape: (20334, 6) |
模型
咱们应用的模型是一个由 8 层组成的序列模型。模型前两层由 LSTM 组成,每个 LSTM 具备 32 个神经元,应用的激活函数为 Relu。而后是用于提取空间特色的卷积层。
在两层的连接处须要扭转 LSTM 输入维度,因为输入具备 3 个维度(样本数,工夫步长,输出维度),而 CNN 则须要 4 维输出(样本数,1,工夫步长,输出)。
第一个 CNN 层具备 64 个神经元,另一个神经元有 128 个神经元。在第一和第二 CNN 层之间,咱们有一个最大池层来执行下采样操作。而后是全局均匀池(GAP)层将多 D 特色映射转换为 1 - D 特征向量,因为在此层中不须要参数,所以会缩小全局模型参数。而后是 BN 层,该层有助于模型的收敛性。
最初一层是模型的输入层,该输入层只是具备 SoftMax 分类器层的 6 个神经元的齐全连贯的层,该层示意以后类的概率。
model = Sequential() | |
model.add(LSTM(32, return_sequences=True, input_shape=(input_shape,1), activation='relu')) | |
model.add(LSTM(32,return_sequences=True, activation='relu')) | |
model.add(Reshape((1, 240, 32))) | |
model.add(Conv1D(filters=64,kernel_size=2, activation='relu', strides=2)) | |
model.add(Reshape((120, 64))) | |
model.add(MaxPool1D(pool_size=4, padding='same')) | |
model.add(Conv1D(filters=192, kernel_size=2, activation='relu', strides=1)) | |
model.add(Reshape((29, 192))) | |
model.add(GlobalAveragePooling1D()) | |
model.add(BatchNormalization(epsilon=1e-06)) | |
model.add(Dense(6)) | |
model.add(Activation('softmax')) | |
print(model.summary()) |
训练和后果
经过训练,模型给出了 98.02% 的准确率和 0.0058 的损失。训练 F1 得分为 0.96。
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy']) | |
history = model.fit(x_train, | |
y_train_hot, | |
batch_size= 192, | |
epochs=100 | |
) |
可视化训练的准确性和损失变动图。
plt.figure(figsize=(6, 4)) | |
plt.plot(history.history['accuracy'], 'r', label='Accuracy of training data') | |
plt.plot(history.history['loss'], 'r--', label='Loss of training data') | |
plt.title('Model Accuracy and Loss') | |
plt.ylabel('Accuracy and Loss') | |
plt.xlabel('Training Epoch') | |
plt.ylim(0) | |
plt.legend() | |
plt.show() | |
y_pred_train = model.predict(x_train) | |
max_y_pred_train = np.argmax(y_pred_train, axis=1) | |
print(classification_report(y_train, max_y_pred_train)) |
在测试数据集上测试它,但在通过测试集之前,须要对测试集进行雷同的预处理。
df_test['X'] = (df_test['X']-df_test['X'].min())/(df_test['X'].max()-df_test['X'].min()) | |
df_test['Y'] = (df_test['Y']-df_test['Y'].min())/(df_test['Y'].max()-df_test['Y'].min()) | |
df_test['Z'] = (df_test['Z']-df_test['Z'].min())/(df_test['Z'].max()-df_test['Z'].min()) | |
x_test, y_test = segments(df_test, | |
TIME_PERIOD, | |
STEP_DISTANCE, | |
LABEL) | |
x_test = x_test.reshape(x_test.shape[0], input_shape) | |
x_test = x_test.astype('float32') | |
y_test = y_test.astype('float32') | |
y_test = to_categorical(y_test, num_classes) |
在评估咱们的测试数据集后,失去了 89.14% 的准确率和 0.4647 的损失。F1 测试得分为 0.89。
score = model.evaluate(x_test, y_test) | |
print("Accuracy:", score[1]) | |
print("Loss:", score[0]) |
上面绘制混同矩阵更好地了解对测试数据集的预测。
predictions = model.predict(x_test) | |
predictions = np.argmax(predictions, axis=1) | |
y_test_pred = np.argmax(y_test, axis=1) | |
cm = confusion_matrix(y_test_pred, predictions) | |
cm_disp = ConfusionMatrixDisplay(confusion_matrix= cm) | |
cm_disp.plot() | |
plt.show() |
还能够在测试数据集上评估的模型的分类报告。
print(classification_report(y_test_pred, predictions))
总结
LSTM-CNN 模型的性能比任何其余机器学习模型要好得多。本文的代码能够在 GitHub 上找到。
https://avoid.overfit.cn/post/a6438a08d1d84923933b0a811d8edc11
您能够尝试本人实现它,通过优化模型来进步 F1 分数。
另:这个模型是来自于 Xia Kun, Huang Jianguang, and Hanyu Wang 在 IEEE 期刊上发表的论文 LSTM-CNN Architecture for Human Activity Recognition。
作者:Tanmay chauhan