开始│├─ 数据准备│ ├─ 导入Kaggle数据集│ ├─ 遍历数据文件│ └─ 合并数据集│├─ 数据预处理│ ├─ 转换数据格式(宽表→长表)│ ├─ 数据质量检查(样本量/时长)│ └─ 异常传感器处理(排除a1)│├─ 时域分析│ ├─ 可视化时间序列│ ├─ 分布分析(箱型图)│ └─ 特征提取(统计特征)│├─ 特征工程│ ├─ 分割数据片段(1000点/段)│ ├─ 计算统计特征(均值/标准差等)│ └─ 构建特征矩阵│├─ 机器学习建模│ ├─ 划分训练/测试集│ ├─ 随机森林训练│ ├─ 模型评估(准确率)│ └─ 多模型比较(lazypredict)│├─ 频域分析│ ├─ FFT频谱分析│ ├─ 峰值检测│ ├─ 功率谱密度(PSD)│ └─ 健康/故障PSD对比│├─ 时频分析│ └─ 谱图可视化│└─ 结束
详细算法步骤
数据准备阶段 :
从Kaggle下载齿轮箱故障诊断数据集
遍历数据集目录结构,识别符合命名规范的文件
从文件名提取关键信息:状态(健康/断齿)、负载值
合并所有数据文件为统一DataFrame
数据预处理 :
转换数据格式:将宽格式(每列一个传感器)转换为长格式(堆叠传感器读数)
分析数据完整性:计算最小样本量及对应时长
识别并排除异常传感器(a1)对整体分布的影响
时域分析 :
可视化不同负载下健康/故障状态的时间序列
使用箱型图比较各传感器在不同状态下的读数分布
计算全局读数分布,验证数据质量
特征工程 :
将连续信号分割为固定长度片段(1000个样本)
为每个片段计算多种统计特征:
传感器类型指示器
负载值
均值、标准差
峰度、偏度
高阶矩
构建特征矩阵和标签向量(故障=1,健康=0)
机器学习建模 :
划分训练集(80%)和测试集(20%),保持类别比例
训练随机森林分类器
评估模型准确率
使用lazypredict自动比较多种分类算法性能
频域分析 :
对单传感器数据进行FFT变换,获取频谱
检测频谱中的显著峰值
计算功率谱密度(PSD)分析信号能量分布
比较健康/故障状态下PSD的差异和相关程度
时频分析 :
计算并可视化所有传感器在不同状态下的谱图
展示信号频率成分随时间的变化
算法在信号分析中的应用
| 故障检测 | ||
| 故障分类 | ||
| 负载影响分析 | ||
| 传感器评估 | ||
| 频带特征提取 | ||
| 状态趋势监测 | ||
| 信号质量评估 | ||
| 故障定位 | ||
| 运行优化 | ||
| 诊断模型验证 |
结合机器学习和深度学习
| 特征自动提取 | ||
| 多传感器融合 | ||
| 异常检测 | ||
| 迁移学习 | ||
| 注意力机制 | ||
| 时序建模 | ||
| 图神经网络 | ||
| 生成对抗网络 | ||
| 多任务学习 | ||
| 模型可解释性 | ||
| 在线学习 |
# 导入Kaggle数据集import kagglehub# 下载齿轮箱故障诊断数据集brjapon_gearbox_fault_diagnosis_path = kagglehub.dataset_download('brjapon/gearbox-fault-diagnosis')print('数据源导入完成。')# 导入基础数据分析库import numpy as np # 数值计算库import pandas as pd # 数据处理库# 遍历输入目录下的所有文件import osfor dirname, _, filenames in os.walk('/kaggle/input'):for filename in filenames:print(os.path.join(dirname, filename)) # 打印文件路径# 导入可视化与分析库import matplotlib.pyplot as plt # 数据可视化import seaborn as sns # 高级数据可视化from scipy import fft, signal, stats # 信号处理与统计from tqdm.notebook import tqdm # 进度条工具"""数据集说明:齿轮箱故障诊断数据集包含使用SpectraQuest齿轮箱故障诊断模拟器记录的振动数据。数据集使用4个不同方向的振动传感器记录,负载从0%到90%变化。包含两种场景:1) 健康状态2) 断齿状态共20个文件,健康齿轮箱10个,断齿齿轮箱10个。每个文件对应0-90%的负载(步长10%)。文件名结构:- 首字符:"h"表示健康,"b"表示断齿- 中间包含"30hz"表示采样率- 末尾字符:负载值(0-90)"""# 初始化数据存储列表dfs = []# 遍历输入目录下的所有文件for dirname, _, filenames in tqdm(os.walk('/kaggle/input')):for filename in tqdm(filenames, leave=False):# 检查文件名是否符合预期模式if not (filename.startswith('h') or filename.startswith('b')) or '30hz' not in filename or not filename.endswith('.csv'):continue # 跳过不符合模式的文件# 从文件名提取状态(健康/断齿)state = filename[0]# 从文件名提取负载值load = int(filename.split('.')[0][5:])# 读取CSV文件df = pd.read_csv(os.path.join(dirname, filename))# 添加状态列df['state'] = state# 添加负载列df['load'] = load# 添加到列表dfs.append(df)# 合并所有数据集并重置索引df = pd.concat(dfs).reset_index().rename(columns={'index':'sample_index'})# 将数据转换为"长格式"(传感器读数堆叠)sensor_readings = df.melt(id_vars=['sample_index','state','load'], # 保留的标识列value_vars=['a1','a2','a3','a4'], # 要堆叠的传感器列var_name='sensor', # 新列名:传感器类型value_name='reading' # 新列名:读数)# 基本探索性数据分析(EDA)# 可视化各文件样本数量分布sns.countplot(data=sensor_readings[sensor_readings.sensor=='a1'], # 使用传感器a1的数据y='load', # y轴为负载hue='state', # 按状态着色)# 计算最小样本数及对应时长lowest_samples = df.groupby(['state','load']).sample_index.count().min()print(f'最小样本数 = {lowest_samples}')print(f'对应时长 = {lowest_samples/30:0.2f}秒 或 {lowest_samples/30/60:0.2f}分钟')# 数据筛选辅助函数def rdg(df, state=None, load=None, sensor=None):"""根据状态、负载和传感器筛选数据"""df_st = df[df.state==state] if state is not None else dfdf_lo = df_st[df_st.load==load] if load is not None else df_stdf_se = df_lo[df_lo.sensor==sensor] if sensor is not None else df_loreturn df_se# 时域分析:绘制传感器读数时间序列g = sns.FacetGrid(data=pd.concat([rdg(sensor_readings, load=0, sensor='a1'), # 负载0%的数据rdg(sensor_readings, load=90, sensor='a1') # 负载90%的数据]),col='load', # 按负载分列row='state', # 按状态分行height=2.5, # 子图高度aspect=2.5 # 子图宽高比)g.map(plt.plot, 'reading') # 在每个子图上绘制读数plt.show()# 使用箱型图比较读数分布print('每行表示不同传感器,每列表示不同负载')print('每个子图展示健康与故障齿轮箱的读数分布')sns.catplot(data=sensor_readings,col='load', row='sensor', # 按负载分列,按传感器分行x='state', y='reading', # x轴为状态,y轴为读数kind='boxen', # 使用箱型图变体height=10 # 图形高度)# 分析传感器a1对整体分布的影响display(sensor_readings.groupby(['sensor','state']).reading.std().unstack())print(f"包含传感器a1的标准差: {sensor_readings.reading.std()}")print(f"排除传感器a1的标准差: {sensor_readings[sensor_readings.sensor!='a1'].reading.std()}")# 排除传感器a1的数据readings = sensor_readings[sensor_readings.sensor!='a1']# 特征工程:创建机器学习特征data = [] # 特征数据存储labels = [] # 标签存储# 分组处理:按状态、负载、传感器分组for (state,load,sensor),g in sensor_readings.groupby(['state','load','sensor']):vals = g.reading.values # 获取该组的所有读数# 将数据分割为1000个样本的片段splits = np.split(vals, range(1000,vals.shape[0],1000))for s in splits[:-1]: # 跳过最后一个可能不完整的片段# 提取时域特征data.append({'sensor_a1': int(sensor=='a1'), # 传感器指示器'sensor_a2': int(sensor=='a2'),'sensor_a3': int(sensor=='a3'),'load': load, # 负载值'mean': np.mean(s), # 均值'std': np.std(s), # 标准差'kurt': stats.kurtosis(s), # 峰度'skew': stats.skew(s), # 偏度'moment': stats.moment(s), # 矩})labels.append(int(state=='b')) # 标签:1表示故障,0表示健康# 转换为DataFramedf_data = pd.DataFrame(data)data = df_data.valueslabels = np.array(labels)# 数据集统计print(f'总样本数: {len(labels)}')print(f'故障类样本: {np.sum(labels)} ({np.sum(labels)/len(labels):0.1%})')# 划分训练集和测试集from sklearn.model_selection import train_test_splitX_train, X_test, y_train, y_test = train_test_split(data, labels,train_size=0.8, # 80%训练random_state=42, # 随机种子stratify=labels # 分层抽样保持类别比例)print(f'训练数据: {X_train.shape}')print(f'测试数据: {X_test.shape}')# 随机森林模型训练与评估from sklearn.ensemble import RandomForestClassifierfrom sklearn.metrics import accuracy_scoremodel = RandomForestClassifier(random_state=42)model.fit(X_train, y_train)y_pred = model.predict(X_test)print(f'准确率: {accuracy_score(y_test, y_pred):0.2%}')# 使用lazypredict自动比较多个模型get_ipython().system('pip install lazypredict')from lazypredict.Supervised import LazyClassifierclf = LazyClassifier(verbose=0, ignore_warnings=True)models, predictions = clf.fit(X_train, X_test, y_train, y_test)print(models.head()) # 显示表现最好的模型# 频域分析:快速傅里叶变换(FFT)sensor_data = rdg(sensor_readings, 'h', 10, 'a4').reading.valuesy = np.abs(fft.rfft(sensor_data)) # 计算FFT幅度x = fft.rfftfreq(sensor_data.shape[0], 1/30) # 计算频率轴(30Hz采样率)plt.plot(x, y)plt.xlabel('频率 (Hz)')plt.ylabel('信号强度')plt.show()# 在频谱中检测峰值fig, ax = plt.subplots(figsize=(25,3))ax.plot(x, y)# 设置峰值检测参数x_peak_spacing = y.shape[0] / x.max() # 最小峰值间距(1Hz)x_peak_prominence = np.quantile(y,0.99) # 最小峰值显著度(99%分位数)# 检测峰值peaks, _ = signal.find_peaks(y, distance=x_peak_spacing, prominence=x_peak_prominence)for peak in peaks:ax.scatter(x=x[peak], y=y[peak], c='r', marker='o', s=30) # 标记峰值ax.set_xlabel('频率 (Hz)')ax.set_ylabel('信号强度')plt.show()# 功率谱密度(PSD)分析x, y = signal.welch(sensor_data, fs=30) # 使用Welch方法计算PSDplt.plot(x, y)plt.xlabel('频率 (Hz)')plt.ylabel('功率谱密度 (V^2/Hz)')plt.show()# 比较不同状态下的PSDfig, ax = plt.subplots(ncols=4, nrows=10, figsize=(15,20))for ((load,sensor), dfg), axi in tqdm(zip(sensor_readings.groupby(['load','sensor']), ax.ravel()), total=40):# 分离健康和故障数据healthy_raw = dfg[dfg.state=='h'].reading.valuesbroken_raw = dfg[dfg.state=='b'].reading.values# 计算PSDfh, Ph = signal.welch(healthy_raw, fs=30)fb, Pb = signal.welch(broken_raw, fs=30)# 绘制并计算相关性axi.plot(fh, Ph, c='g', alpha=0.6, label='健康')axi.plot(fb, Pb, c='r', alpha=0.6, label='故障')axi.set_ylim(0,50)axi.set_title(f'负载={load}, 传感器 {sensor}, 相关性: {np.corrcoef(Ph, Pb)[0,1]:0.2}')plt.tight_layout()plt.show()# 时频分析:谱图可视化fig, ax = plt.subplots(ncols=8, nrows=10, figsize=(25,20))for ((load, sensor, state), dfg), axi in tqdm(zip(sensor_readings.groupby(['load','sensor','state']), ax.ravel()), total=80):raw = dfg.reading.values# 计算谱图f, t, Sxx = signal.spectrogram(raw, fs=30)# 绘制谱图axi.pcolormesh(t, f, Sxx, cmap='nipy_spectral')axi.set_title(f'负载={load}, 传感器 {sensor}, 状态 {state}')plt.tight_layout()plt.show()
https://www.zhihu.com/consult/people/792359672131756032?isMe=1擅长领域:信号滤波/降噪,机器学习/深度学习,时间序列预分析/预测,设备故障诊断/缺陷检测/异常检测