1 Star 0 Fork 0

PAGNHU/DeepLog

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
LogKeyModel_predict.py 3.95 KB
一键复制 编辑 原始数据 按行查看 历史
PAGNHU 提交于 2024-03-28 14:32 . commit
import torch
import torch.nn as nn
import time
import argparse
# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def generate(name):
# If you what to replicate the DeepLog paper results(Actually, I have a better result than DeepLog paper results),
# you should use the 'list' not 'set' to obtain the full dataset, I use 'set' just for test and acceleration.
hdfs = set()
# hdfs = []
with open('data/' + name, 'r') as f:
for ln in f.readlines():
ln = list(map(lambda n: n - 1, map(int, ln.strip().split())))
ln = ln + [-1] * (window_size + 1 - len(ln))
hdfs.add(tuple(ln))
# hdfs.append(tuple(ln))
print('Number of sessions({}): {}'.format(name, len(hdfs)))
return hdfs
class Model(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, num_keys):
super(Model, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, num_keys)
def forward(self, x):
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
out, _ = self.lstm(x, (h0, c0))
out = self.fc(out[:, -1, :])
return out
if __name__ == '__main__':
# Hyperparameters
num_classes = 28
input_size = 1
model_path = 'model/Adam_batch_size=2048_epoch=300.pt'
parser = argparse.ArgumentParser()
parser.add_argument('-num_layers', default=2, type=int)
parser.add_argument('-hidden_size', default=64, type=int)
parser.add_argument('-window_size', default=10, type=int)
parser.add_argument('-num_candidates', default=9, type=int)
args = parser.parse_args()
num_layers = args.num_layers
hidden_size = args.hidden_size
window_size = args.window_size
num_candidates = args.num_candidates
model = Model(input_size, hidden_size, num_layers, num_classes).to(device)
model.load_state_dict(torch.load(model_path))
model.eval()
print('model_path: {}'.format(model_path))
test_normal_loader = generate('hdfs_test_normal')
test_abnormal_loader = generate('hdfs_test_abnormal')
TP = 0
FP = 0
# Test the model
start_time = time.time()
with torch.no_grad():
for line in test_normal_loader:
for i in range(len(line) - window_size):
seq = line[i:i + window_size]
label = line[i + window_size]
seq = torch.tensor(seq, dtype=torch.float).view(-1, window_size, input_size).to(device)
label = torch.tensor(label).view(-1).to(device)
output = model(seq)
predicted = torch.argsort(output, 1)[0][-num_candidates:]
if label not in predicted:
FP += 1
break
with torch.no_grad():
for line in test_abnormal_loader:
for i in range(len(line) - window_size):
seq = line[i:i + window_size]
label = line[i + window_size]
seq = torch.tensor(seq, dtype=torch.float).view(-1, window_size, input_size).to(device)
label = torch.tensor(label).view(-1).to(device)
output = model(seq)
predicted = torch.argsort(output, 1)[0][-num_candidates:]
if label not in predicted:
TP += 1
break
elapsed_time = time.time() - start_time
print('elapsed_time: {:.3f}s'.format(elapsed_time))
# Compute precision, recall and F1-measure
FN = len(test_abnormal_loader) - TP
P = 100 * TP / (TP + FP)
R = 100 * TP / (TP + FN)
F1 = 2 * P * R / (P + R)
print('false positive (FP): {}, false negative (FN): {}, Precision: {:.3f}%, Recall: {:.3f}%, F1-measure: {:.3f}%'.format(FP, FN, P, R, F1))
print('Finished Predicting')
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/panghu-2001/deep-log.git
git@gitee.com:panghu-2001/deep-log.git
panghu-2001
deep-log
DeepLog
master

搜索帮助