1 Star 2 Fork 0

jack/projectRUL

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
cnn_gru_pytorch.py 27.67 KB
一键复制 编辑 原始数据 按行查看 历史
cyh 提交于 2018-11-22 10:05 . change the index of fft and normalization
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677
import math
import numpy as np
import random
import matplotlib.pyplot as plt
from collections import OrderedDict
from dataset import DataSet
import torch
from torch import nn, optim
from torch.autograd import Variable
from torch.utils.data import DataLoader
from torch.nn.utils import weight_norm
class BasicBlock(nn.Module):
expansion = 1
def __init__(self, inplanes, planes, stride=1, downsample=None):
super(BasicBlock, self).__init__()
m = OrderedDict()
m['conv1'] = nn.Conv1d(inplanes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
m['bn1'] = nn.BatchNorm1d(planes)
m['relu1'] = nn.ReLU(inplace=True)
m['conv2'] = nn.Conv1d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
m['bn2'] = nn.BatchNorm1d(planes)
self.group1 = nn.Sequential(m)
self.relu= nn.Sequential(nn.ReLU(inplace=True))
self.downsample = downsample
def forward(self, x):
if self.downsample is not None:
residual = self.downsample(x)
else:
residual = x
out = self.group1(x) + residual
out = self.relu(out)
return out
class Bottleneck(nn.Module):
expansion = 4
def __init__(self, inplanes, planes, stride=1, downsample=None):
super(Bottleneck, self).__init__()
m = OrderedDict()
m['conv1'] = nn.Conv1d(inplanes, planes, kernel_size=1, bias=False)
m['bn1'] = nn.BatchNorm1d(planes)
m['relu1'] = nn.ReLU(inplace=True)
m['conv2'] = nn.Conv1d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
m['bn2'] = nn.BatchNorm1d(planes)
m['relu2'] = nn.ReLU(inplace=True)
m['conv3'] = nn.Conv1d(planes, planes * 4, kernel_size=1, bias=False)
m['bn3'] = nn.BatchNorm1d(planes * 4)
self.group1 = nn.Sequential(m)
self.relu= nn.Sequential(nn.ReLU(inplace=True))
self.downsample = downsample
def forward(self, x):
if self.downsample is not None:
residual = self.downsample(x)
else:
residual = x
out = self.group1(x) + residual
out = self.relu(out)
return out
class ResNet(nn.Module):
def __init__(self, block, layers, feature_size=8):
self.inplanes = 64
super(ResNet, self).__init__()
m = OrderedDict()
m['conv1'] = nn.Conv1d(4, 64, kernel_size=6, stride=2, padding=2, bias=False)
m['bn1'] = nn.BatchNorm1d(64)
m['relu1'] = nn.ReLU(inplace=True)
m['maxpool'] = nn.MaxPool1d(kernel_size=2, stride=2, padding=0)
self.group1= nn.Sequential(m)
self.layer1 = self._make_layer(block, 64, layers[0], stride=1)
self.layer2 = self._make_layer(block, 128, layers[1], stride=4)
self.layer3 = self._make_layer(block, 256, layers[2], stride=4)
self.layer4 = self._make_layer(block, 256, layers[3], stride=4)
self.avgpool = nn.Sequential(nn.AvgPool1d(5))
self.group2 = nn.Sequential(
OrderedDict([
('fc', nn.Linear(256 * block.expansion, feature_size))
])
)
self.output = nn.Linear(feature_size,1)
# initialization weight of conv2d and bias of BN
for m in self.modules():
if isinstance(m, nn.Conv1d):
n = m.kernel_size[0] * m.out_channels
m.weight.data.normal_(0, math.sqrt(2. / n))
elif isinstance(m, nn.BatchNorm1d):
m.weight.data.fill_(1)
m.bias.data.zero_()
def _make_layer(self, block, planes, blocks, stride=1):
downsample = None
if stride != 1 or self.inplanes != planes * block.expansion:
downsample = nn.Sequential(
nn.Conv1d(self.inplanes, planes * block.expansion, kernel_size=1, stride=stride, bias=False),
nn.BatchNorm1d(planes * block.expansion),
)
layers = []
layers.append(block(self.inplanes, planes, stride, downsample))
self.inplanes = planes * block.expansion
for i in range(1, blocks):
layers.append(block(self.inplanes, planes))
return nn.Sequential(*layers)
def forward(self, x):
x = self.group1(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.avgpool(x)
x = x.view(x.size(0), -1)
feature = self.group2(x)
x = self.output(feature)
return x,feature
class Chomp1d(nn.Module):
def __init__(self, chomp_size):
super(Chomp1d, self).__init__()
self.chomp_size = chomp_size
def forward(self, x):
return x[:, :, :-self.chomp_size].contiguous()
class TemporalBlock(nn.Module):
def __init__(self, n_inputs, n_outputs, kernel_size, stride, dilation, padding, dropout=0.2):
super(TemporalBlock, self).__init__()
self.conv1 = weight_norm(nn.Conv1d(n_inputs, n_outputs, kernel_size,
stride=stride, padding=padding, dilation=dilation))
self.chomp1 = Chomp1d(padding)
self.relu1 = nn.ReLU()
self.dropout1 = nn.Dropout(dropout)
self.conv2 = weight_norm(nn.Conv1d(n_outputs, n_outputs, kernel_size,
stride=stride, padding=padding, dilation=dilation))
self.chomp2 = Chomp1d(padding)
self.relu2 = nn.ReLU()
self.dropout2 = nn.Dropout(dropout)
self.net = nn.Sequential(self.conv1, self.chomp1, self.relu1, self.dropout1,
self.conv2, self.chomp2, self.relu2, self.dropout2)
self.downsample = nn.Conv1d(n_inputs, n_outputs, 1) if n_inputs != n_outputs else None
self.relu = nn.ReLU()
self.init_weights()
def init_weights(self):
self.conv1.weight.data.normal_(0, 0.01)
self.conv2.weight.data.normal_(0, 0.01)
if self.downsample is not None:
self.downsample.weight.data.normal_(0, 0.01)
def forward(self, x):
out = self.net(x)
res = x if self.downsample is None else self.downsample(x)
return self.relu(out + res)
class TCN(nn.Module):
def __init__(self, num_input_channel, num_channels, num_block=None, kernel_size=3, dropout=0.1):
super(TCN, self).__init__()
layers = []
num_levels = len(num_channels)
num_block = [1]*len(num_channels) if num_block is None else num_block
for i in range(num_levels):
dilation_size = 2 ** i
in_channels = num_input_channel if i == 0 else num_channels[i-1]
out_channels = num_channels[i]
for j in range(num_block[i]):
in_channels = num_channels[i] if j > 0 else in_channels
layers += [TemporalBlock(in_channels, out_channels, kernel_size, stride=1, dilation=dilation_size,
padding=(kernel_size-1) * dilation_size, dropout=dropout)]
self.network = nn.Sequential(*layers)
def forward(self, x):
x = self.network(x)
return x
class CNN(nn.Module):
def __init__(self, feature_size):
super(CNN, self).__init__()
self.cnn_net = nn.Sequential(
nn.Conv1d(2,32,65,1,32), #in_shape (2,2560)
nn.BatchNorm1d(32),
nn.ReLU(),
nn.MaxPool1d(8), #out_shape (32,320)
nn.Conv1d(32,32,5,1,2), #in_shape (32,320)
nn.BatchNorm1d(32),
nn.ReLU(),
nn.MaxPool1d(4), #out_shape (32,80)
nn.Conv1d(32,64,3,1,1), #in_shape (32,80)
nn.BatchNorm1d(64),
nn.ReLU(),
nn.MaxPool1d(2), #out_shape (64,40)
nn.Conv1d(64,64,3,1,1), #in_shape (64,40)
nn.BatchNorm1d(64),
nn.ReLU(),
nn.MaxPool1d(2), #out_shape (64,20)
nn.Conv1d(64,128,3,1,1), #in_shape (64,20)
nn.BatchNorm1d(128),
nn.ReLU(),
nn.MaxPool1d(2), #out_shape (128,10)
nn.Conv1d(128,128,3,1,1), #in_shape (128,10)
nn.BatchNorm1d(128),
nn.ReLU(),
nn.MaxPool1d(2), #out_shape (128,5)
)
self.nn_net = nn.Sequential(
nn.Dropout(0.25),
nn.Linear(128*5,256),
nn.BatchNorm1d(256),
nn.ReLU(),
nn.Dropout(0.25),
nn.Linear(256,128),
nn.BatchNorm1d(128),
nn.ReLU(),
nn.Linear(128,feature_size),
nn.BatchNorm1d(feature_size),
nn.ReLU()
)
self.out = nn.Linear(feature_size,1)
def forward(self, x):
x = self.cnn_net(x)
x = x.view(x.size(0), -1) # flatten the output of conv2 to (batch_size, 64*20)
x = self.nn_net(x)
output = self.out(x)
return output, x # return x for visualization
class GRU(nn.Module):
def __init__(self, feature_size):
super(GRU, self).__init__()
self.gru_1 = nn.GRU(feature_size,32,2,batch_first=True) # the input size should be the same with h in _gru_fit
# self.gru_2 = nn.GRU(32,1,1,batch_first=True)
self.linear = nn.Linear(32,1)
def forward(self,x,h):
x,h = self.gru_1(x,h)
# x,h = self.gru_2(x,h)
x = self.linear(x)
return x,h
class Custom_loss(nn.Module):
def __init__(self):
super(Custom_loss, self).__init__()
def forward(self,pred,tru):
return torch.mean((pred-tru)**2/(tru+1))
class CNN_GRU():
def __init__(self):
self.feature_size = 8
self.dataset = DataSet.load_dataset(name='phm_data')
self.train_bearings = ['Bearing1_1','Bearing1_2','Bearing2_1','Bearing2_2','Bearing3_1','Bearing3_2']
self.test_bearings = ['Bearing1_3','Bearing1_4','Bearing1_5','Bearing1_6','Bearing1_7',
'Bearing2_3','Bearing2_4','Bearing2_5','Bearing2_6','Bearing2_7',
'Bearing3_3']
def _build_cnn(self):
# model = CNN(self.feature_size)
model = ResNet(Bottleneck, [3, 4, 23, 3])
weight_p, bias_p = [],[]
# for name, p in model.named_parameters():
# if 'bias' in name:
# bias_p += [p]
# else:
# weight_p += [p]
# # 这里的model中每个参数的名字都是系统自动命名的,只要是权值都是带有weight,偏置都带有bias,
# # 因此可以通过名字判断属性,这个和tensorflow不同,tensorflow是可以用户自己定义名字的,当然也会系统自己定义。
# self.cnn_optimizer = torch.optim.Adam([
# {'params': weight_p, 'weight_decay':1e-6},
# {'params': bias_p, 'weight_decay':0}
# ], lr=1e-3)
self.cnn_optimizer = torch.optim.Adam(model.parameters(), lr=0.001) # optimize all cnn parameters
# self.cnn_loss_func = Custom_loss() # the target label is not one-hotted
self.cnn_loss_func = nn.MSELoss()
if torch.cuda.is_available():
model = model.cuda()
return model
def _build_gru(self):
model = GRU(self.feature_size)
self.gru_optimizer = torch.optim.Adam(model.parameters(),lr=0.001)
self.gru_loss_func = nn.MSELoss()
if torch.cuda.is_available():
model = model.cuda()
return model
def _build_tcn(self):
model = TCN(
num_input_channel=self.feature_size,
num_channels=[64,64,64,1],
num_block=[1,2,3,1],
kernel_size=3,
dropout=0.1
)
weight_p, bias_p = [],[]
for name, p in model.named_parameters():
if 'bias' in name:
bias_p += [p]
else:
weight_p += [p]
# 这里的model中每个参数的名字都是系统自动命名的,只要是权值都是带有weight,偏置都带有bias,
# 因此可以通过名字判断属性,这个和tensorflow不同,tensorflow是可以用户自己定义名字的,当然也会系统自己定义。
self.tcn_optimizer = torch.optim.Adam([
{'params': weight_p, 'weight_decay':1e-6},
{'params': bias_p, 'weight_decay':0}
], lr=1e-3)
# self.tcn_optimizer = torch.optim.Adam(model.parameters(),lr=0.001)
self.tcn_loss_func = nn.MSELoss()
# self.tcn_loss_func = Custom_loss()
if torch.cuda.is_available():
model = model.cuda()
return model
def _normalize(self,data):
r_data = np.zeros_like(data)
for i in range(r_data.shape[0]):
r_data[i,] = (data[i] - np.mean(data[i]))/max(abs(np.max(data[i])),abs(np.min(data[i])))
# r_data[i,] = ((data[i]-np.min(data[i]))/(np.max(data[i])-np.min(data[i]))-0.5)*2
return r_data
def _fft(self,data):
fft_data = np.fft.fft(data,axis=2)/data.shape[2]
r_fft_data = np.append(fft_data[:,:,0:int(data.shape[2]/2)].real,fft_data[:,:,0:int(data.shape[2]/2)].imag,axis=1)
return r_fft_data
def _add_noise(self,data,snr=0):
snr = 10**(snr/10.0)
for i in range(data.size()[0]):
xpower = torch.sum(data[i,]**2)/data[i,].numel()
npower = xpower/snr
data[i,] += torch.randn(size=data[i,].size())*torch.sqrt(npower)
return data
def _c_preprocess(self,select='train',is_random=True):
if select == 'train':
temp_data = self.dataset.get_value('data',condition={'bearing_name':self.train_bearings})
temp_label = self.dataset.get_value('RUL',condition={'bearing_name':self.train_bearings})
elif select == 'test':
temp_data = self.dataset.get_value('data',condition={'bearing_name':self.test_bearings})
temp_label = self.dataset.get_value('RUL',condition={'bearing_name':self.test_bearings})
else:
raise ValueError('wrong selection!')
train_data = np.array([])
train_label = np.array([])
for i,x in enumerate(temp_label):
t_label = [y for y in range(round(x),round(x + temp_data[i].shape[0]))]
t_label.reverse()
if train_data.size == 0:
train_data = temp_data[i]
train_label = np.array(t_label)
else:
train_data = np.append(train_data,temp_data[i],axis=0)
train_label = np.append(train_label,np.array(t_label),axis=0)
assert train_data.shape[0] == train_label.shape[0]
if is_random:
idx = [x for x in range(train_data.shape[0])]
random.shuffle(idx)
train_data = train_data[idx]
train_label = train_label[idx]
return np.transpose(train_data,(0,2,1)),train_label[:,np.newaxis]
def _g_preprocess(self,model,select,is_random=True):
if select == 'train':
temp_data = self.dataset.get_value('data',condition={'bearing_name':self.train_bearings})
temp_label = self.dataset.get_value('RUL',condition={'bearing_name':self.train_bearings})
elif select == 'test':
temp_data = self.dataset.get_value('data',condition={'bearing_name':self.test_bearings})
temp_label = self.dataset.get_value('RUL',condition={'bearing_name':self.test_bearings})
else:
raise ValueError('wrong selection!')
r_temp_label = []
r_temp_data = []
for i,x in enumerate(temp_label):
t_label = [y for y in range(round(x),round(x + temp_data[i].shape[0]))]
t_label.reverse()
r_temp_label.append(np.array(t_label))
r_temp_data.append(self._cnn_predict(model,self._normalize(self._fft(np.transpose(temp_data[i],(0,2,1)))))[1])
r_data = []
r_label = []
if is_random:
for i in range(10000):
bearing_idx = random.randint(0,len(r_temp_data)-1)
random_bearing = r_temp_data[bearing_idx]
random_bearing_RUL = r_temp_label[bearing_idx]
start_idx = random.randint(0,random_bearing.shape[0]-101)
# end_idx = start_idx + random.randint(50,100)
end_idx = start_idx + 100
r_t_data = random_bearing[start_idx:end_idx,]
if r_t_data.shape[0] < 100:
r_t_data = np.append(np.zeros((100-r_t_data.shape[0],self.feature_size)),r_t_data,axis=0)
r_data.append(r_t_data)
r_label.append(random_bearing_RUL[start_idx:end_idx])
else:
for i in range(len(r_temp_data)):
for j in range(len(r_temp_data[i])-100):
r_data.append(r_temp_data[i][j:j+100,])
r_label.append(r_temp_label[i][j:j+100])
return np.array(r_data),np.array(r_label)
def _cnn_fit(self,model,data,label,batch_size,epochs,snr=-4):
model.train()
data_loader = dataset_ndarry_pytorch(data,label,batch_size,True)
print_per_sample = 2000
for epoch in range(epochs):
counter_per_epoch = 0
for i,(x_data,x_label) in enumerate(data_loader):
x_data = x_data.type(torch.FloatTensor)
x_label = x_label.type(torch.FloatTensor)
if snr != None:
x_data = self._add_noise(x_data)
if torch.cuda.is_available():
x_data = Variable(x_data).cuda()
x_label = Variable(x_label).cuda()
else:
x_data = Variable(x_data)
x_label = Variable(x_label)
# 向前传播
[out,feature] = model(x_data)
loss = self.cnn_loss_func(out, x_label)
# 向后传播
self.cnn_optimizer.zero_grad()
loss.backward()
self.cnn_optimizer.step()
temp_acc = float(np.mean((out.data.cpu().numpy()-x_label.data.cpu().numpy())**2/(x_label.data.cpu().numpy()+1)))
if i == 0:
p_loss = loss
p_acc = temp_acc
else:
p_loss += (loss-p_loss)/(i+1)
p_acc += (temp_acc-p_acc)/(i+1)
if i*batch_size > counter_per_epoch:
print('Epoch: ', epoch, '| train loss: %.4f' % p_loss.data.cpu().numpy(), '| test accuracy: %.2f' % p_acc)
counter_per_epoch += print_per_sample
torch.cuda.empty_cache() #empty useless variable
def _cnn_predict(self,model,data):
batch_size = 32
predict_lable = np.array([])
model.eval()
prediction = []
for i in range(math.ceil(data.shape[0]/batch_size)):
x_data = data[i*batch_size:min(data.shape[0],(i+1)*batch_size),]
x_data = torch.from_numpy(x_data)
x_data = x_data.type(torch.FloatTensor)
x_data = Variable(x_data).cuda()
x_prediction = model(x_data)
if len(prediction) == 0:
for i,x in enumerate(x_prediction):
prediction.append(x_prediction[i].data.cpu().numpy())
else:
for i,x in enumerate(prediction):
prediction[i] = np.append(x,x_prediction[i].data.cpu().numpy(),axis=0)
del x_prediction
return prediction
def test_cnn(self):
c_train_data,c_train_label = self._c_preprocess()
c_train_data = self._normalize(c_train_data)
c_train_data = self._fft(c_train_data)
self.cnn = self._build_cnn()
self._cnn_fit(self.cnn,c_train_data,c_train_label,64,80,None)
torch.save(self.cnn,'./model/cnn')
self.cnn = torch.load('./model/cnn')
c_test_data,c_test_label = self._c_preprocess('test',False)
c_test_data = self._normalize(c_test_data)
c_test_data = self._fft(c_test_data)
[predict_label,_] = self._cnn_predict(self.cnn,c_test_data)
acc = np.mean(np.square(predict_label-c_test_label)/(c_test_label+1))
plt.subplot(2,1,1)
plt.plot(c_test_label)
plt.scatter([x for x in range(predict_label.shape[0])],predict_label,s=2)
plt.title(str(acc))
c_test_data,c_test_label = self._c_preprocess('train',False)
c_test_data = self._normalize(c_test_data)
c_test_data = self._fft(c_test_data)
[predict_label,_] = self._cnn_predict(self.cnn,c_test_data)
acc = np.mean(np.square(predict_label-c_test_label)/(c_test_label+1))
plt.subplot(2,1,2)
plt.plot(c_test_label)
plt.scatter([x for x in range(predict_label.shape[0])],predict_label,s=2)
plt.title(str(acc))
plt.savefig('./model/temp.png',dip=900)
plt.show()
def _gru_fit(self,model,data,label,batch_size,epochs):
model.train()
data_loader = dataset_ndarry_pytorch(data,label,batch_size,True)
print_per_sample = 2000
for epoch in range(epochs):
counter_per_epoch = 0
for i,(x_data,x_label) in enumerate(data_loader):
x_data = x_data.type(torch.FloatTensor)
x_label = x_label.type(torch.FloatTensor)
h = torch.zeros(2,x_data.size()[0],32)
h = h.type(torch.FloatTensor)
if torch.cuda.is_available():
x_data = Variable(x_data).cuda()
x_label = Variable(x_label).cuda()
h = Variable(h).cuda()
else:
x_data = Variable(x_data)
x_label = Variable(x_label)
h = Variable(h)
# 向前传播
out = model(x_data,h)[0]
# out = out[:,-1,:]
out = out.view(out.shape[0],-1)
loss = self.gru_loss_func(out, x_label)
# 向后传播
self.gru_optimizer.zero_grad()
loss.backward()
self.gru_optimizer.step()
temp_acc = torch.mean((out-x_label)**2/(x_label+1))
if i == 0:
p_loss = loss
p_acc = temp_acc
else:
p_loss += (loss-p_loss)/(i+1)
p_acc += (temp_acc-p_acc)/(i+1)
if i*batch_size > counter_per_epoch:
print('Epoch: ', epoch, '| train loss: %.4f' % p_loss.data.cpu().numpy(), '| test accuracy: %.2f' % p_acc.data.cpu().numpy())
counter_per_epoch += print_per_sample
torch.cuda.empty_cache() #empty useless variable
def test_gru(self):
model = torch.load('./model/resnet101')
g_train_data,g_train_label = self._g_preprocess(model,'train') # data.shape=(10000,100,16), label.shape=(10000,)
self.gru = self._build_gru()
self._gru_fit(self.gru,g_train_data,g_train_label,64,100)
torch.save(self.gru,'./model/gru')
def _tcn_fit(self,model,data,label,batch_size,epochs):
model.train()
data_loader = dataset_ndarry_pytorch(data,label,batch_size,True)
print_per_sample = 2000
for epoch in range(epochs):
counter_per_epoch = 0
for i,(x_data,x_label) in enumerate(data_loader):
x_data = x_data.type(torch.FloatTensor)
x_label = x_label.type(torch.FloatTensor)
if torch.cuda.is_available():
x_data = Variable(x_data).cuda()
x_label = Variable(x_label).cuda()
else:
x_data = Variable(x_data)
x_label = Variable(x_label)
# 向前传播
out = model(x_data)
out = out.view(out.shape[0],-1)
loss = self.tcn_loss_func(out, x_label)
# 向后传播
self.tcn_optimizer.zero_grad()
loss.backward()
self.tcn_optimizer.step()
temp_acc = torch.mean((out-x_label)**2/(x_label+1))
if i == 0:
p_loss = loss
p_acc = temp_acc
else:
p_loss += (loss-p_loss)/(i+1)
p_acc += (temp_acc-p_acc)/(i+1)
if i*batch_size > counter_per_epoch:
print('Epoch: ', epoch, '| train loss: %.4f' % p_loss.data.cpu().numpy(), '| test accuracy: %.2f' % p_acc.data.cpu().numpy())
counter_per_epoch += print_per_sample
torch.cuda.empty_cache() #empty useless variable
def _tcn_predict(self,model,data):
batch_size = 32
predict_lable = np.array([])
model.eval()
prediction = []
for i in range(math.ceil(data.shape[0]/batch_size)):
x_data = data[i*batch_size:min(data.shape[0],(i+1)*batch_size),]
x_data = torch.from_numpy(x_data)
x_data = x_data.type(torch.FloatTensor)
x_data = Variable(x_data).cuda() if torch.cuda.is_available() else Variable(x_data)
x_prediction = model(x_data)
x_prediction = x_prediction if isinstance(x_prediction,list) else [x_prediction]
if len(prediction) == 0:
for i,x in enumerate(x_prediction):
prediction.append(x_prediction[i].data.cpu().numpy())
else:
for i,x in enumerate(prediction):
prediction[i] = np.append(x,x_prediction[i].data.cpu().numpy(),axis=0)
del x_prediction
return prediction
def test_tcn(self):
cnn_model = torch.load('./model/resnet101_with_fft')
g_train_data,g_train_label = self._g_preprocess(cnn_model,'train')
g_train_data = np.transpose(g_train_data,(0,2,1))
self.tcn = self._build_tcn()
self._tcn_fit(self.tcn,g_train_data,g_train_label,64,100)
torch.save(self.tcn,'./model/temp_tcn')
tcn_model = torch.load('./model/temp_tcn')
g_data,g_label = self._g_preprocess(cnn_model,'test',False)
g_data = np.transpose(g_data,(0,2,1))
predict_label = self._tcn_predict(tcn_model,g_data)[0]
predict_label = predict_label.reshape(-1,100)
acc = np.mean(np.square(predict_label-g_label)/(g_label+1))
p_g_label = g_label[:,-1].reshape(-1,)
p_predict_label = predict_label[:,-1].reshape(-1,)
plt.subplot(2,1,1)
plt.plot(p_g_label)
plt.scatter([x for x in range(p_predict_label.shape[0])],p_predict_label,s=2)
plt.title(str(acc))
g_data,g_label = self._g_preprocess(cnn_model,'train',False)
g_data = np.transpose(g_data,(0,2,1))
predict_label = self._tcn_predict(tcn_model,g_data)[0]
predict_label = predict_label.reshape(-1,100)
acc = np.mean(np.square(predict_label-g_label)/(g_label+1))
p_g_label = g_label[:,-1].reshape(-1,)
p_predict_label = predict_label[:,-1].reshape(-1,)
plt.subplot(2,1,2)
plt.plot(p_g_label)
plt.scatter([x for x in range(p_predict_label.shape[0])],p_predict_label,s=2)
plt.title(str(acc))
plt.show()
def dataset_ndarry_pytorch(data,label,batch_size,shuffle):
assert data.shape[0] == label.shape[0]
class CustomDataset(torch.utils.data.Dataset):
def __init__(self,data,label):
self.data = data
self.label = label
def __getitem__(self, index):
data, label = self.data[index,], self.label[index,]
return data, label
def __len__(self):
return len(self.data)
customdataset = CustomDataset(data,label)
return DataLoader(customdataset,batch_size=batch_size,shuffle=shuffle)
if __name__ == '__main__':
process = CNN_GRU()
# process.test_cnn()
# process.test_gru()
process.test_tcn()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/jack1314/projectRUL.git
git@gitee.com:jack1314/projectRUL.git
jack1314
projectRUL
projectRUL
master

搜索帮助