代码拉取完成,页面将自动刷新
import json
import pandas as pd
from py2neo import Graph, Node, Relationship, NodeMatcher, RelationshipMatcher
from concurrent.futures import ThreadPoolExecutor
import time
###########################################################################
# 针对之前错误观念:两个节点间可以有多个标签相同的关系(目的节点可能有多个端口)进行了修改。但存在新的问题,考虑学长说的效率问题,尝试线程池方法解决,但运行结果会出现问题。不使用线程池不会出现问题。
# 又修改过了,修改的点在线程池那里,原理上还是相当于每次处理一行,行和行之间会有干扰。
###########################################################################
# 文件名
filename = "C:/Users/yinwe/Desktop/data.json"
# 连接数据库
### centos7+neo4j5.3.0
# graph = Graph("http://192.168.86.128:7474/", user="neo4j", password="12345678", name="neo4j")
### win10+neo4j3.3.9
graph = Graph("http://localhost:7474/",auth=("neo4j", "123456"))
node_matcher = NodeMatcher(graph)
# relation_matcher = RelationshipMatcher(graph)
def process_data(data):
# 取出ip和port数据
src_ip = data['src_ip']
src_port = data['src_port']
dst_ip = data['dst_ip']
dst_port = data['dst_port']
# 判断连接关系是否存在
query1 = """
MATCH (n1:SrcHost {ip:$src_ip})-[r:Connect]->(n2:DstHost {ip:$dst_ip})
RETURN r
"""
result = graph.evaluate(query1, src_ip=src_ip, dst_ip=dst_ip)
# 如果关系不存在,则创建关系,连接次数为1
if result is None:
# 创建或获取源节点
# node1 = Node("SrcHost", ip=src_ip)
# graph.merge(node1, "SrcHost", "ip")
node1 = node_matcher.match("SrcHost").where('_.ip="'+src_ip+'"').first()
if node1 is None:
node1 = Node("SrcHost", ip=src_ip)
graph.create(node1)
# 创建或获取dst_ip节点
# node2 = Node("DstHost", ip=dst_ip)
# graph.merge(node2, "DstHost", "ip")
node2 = node_matcher.match("DstHost").where('_.ip="'+dst_ip+'"').first()
if node2 is None:
node2 = Node("DstHost", ip=dst_ip)
graph.create(node2)
# 创建关系
r = Relationship(node1, 'Connect', node2, dst_port=dst_port, num=1)
graph.create(r)
# 如存在,则连接次数加1
else:
# r = relation_matcher.match({node1, node2}, r_type='Connect').first()
# r['num'] += 1
# graph.push(r)
query3 = """
MATCH (n1:SrcHost {ip:$src_ip})-[r:Connect]->(n2:DstHost {ip:$dst_ip})
SET r.num = r.num + 1
"""
graph.run(query3, src_ip=src_ip, dst_ip=dst_ip)
t1 = time.time()
# 读取文件
with open(filename, 'r', encoding='UTF-8') as file_in:
lines = file_in.readlines()
# for line in lines:
# data = json.loads(line) #json.loads用于从json文件中读取的字符串数据转为字典数据
# # process_data(data)
# with ThreadPoolExecutor() as executor:
# executor.submit(process_data, data)
with ThreadPoolExecutor() as executor:
for line in lines:
data = json.loads(line)
# 使用线程池处理数据
executor.submit(process_data, data)
# process_data(data)
t2 = time.time()
print('time:' + str(t2-t1) + 's')
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。