网络拓扑可视化 之四 前端拓扑数据生成(网络拓扑可视化分析)

网友投稿 561 2022-09-11


网络拓扑可视化 之四 前端拓扑数据生成(网络拓扑可视化分析)

之前博文:

画拓扑,就是要生成nextui前端库所需要的json数据。

1.画拓扑图的代码文件结构:

2.主要代码draw_topo.py:

import os import django os.environ['DJANGO_SETTINGS_MODULE'] = 'topology.settings' django.setup() from arp.models import Device, ArpTable from route.models import Route, NextHop from lldp.models import Link import pytricia import json GLOBAL_LINK = [] DEVICE_NUM_MAPPING = {} DEVICE_NUM = 0 # 用于生成link的时候,唯一标识设备, 1-srchost\2-dsthost SRC_HOST_IP = None DST_HOST_IP = None SRC_HOST_NAME = None DST_HOST_NAME = None SRC_SW = None DST_SW = None OUTPUT_TOPOLOGY_FILENAME = 'draw_topology/topology.js' # 根据IP找出arp表项对应的交换机hostname def get_arp_switch(ipaddr): arps = ArpTable.objects.filter(ipaddress=ipaddr) if len(arps) < 1: return return arps[0].device.hostname, arps[0].interface # ('router5', 'GigabitEthernet0/3') # 设备信息查找 def get_devevice_info_by_hostname(hostname): mgr_ip = Device.objects.filter(hostname=hostname) if len(mgr_ip) < 1: return return mgr_ip[0] # 路由查找 def search_route(destination, hostname): ''' hostname 为设备名 eg.router5 ''' all_subnet_on_host = Route.objects.filter(device__hostname=hostname) if len(all_subnet_on_host) < 0: return route_tree = pytricia.PyTricia() for subnet in all_subnet_on_host: prefix = subnet.subnet nexthops = NextHop.objects.filter(route__subnet=prefix, route__device__hostname=hostname) if len(nexthops) < 1: nexthop_ifaces = [] else: nexthop_ifaces = [nexthop.interface for nexthop in nexthops] route_tree[prefix] = (nexthop_ifaces,) get_longest_prefix = route_tree.get_key(destination) if get_longest_prefix: return hostname, get_longest_prefix, route_tree[get_longest_prefix][0] return # lldp信息查找 def get_remote_port_neighbor(hostname, local_ports): remote_info = {} # {'local_port':{'remote_device_name':x, 'remote_port':y}} for local_port in local_ports: search_link = Link.objects.filter(local_device__hostname=hostname, local_port=local_port) for link in search_link: remote_info[local_port] = {'remote_device_name':link.remote_device_name, 'remote_port':link.remote_port} return remote_info def add_device_num_mapping(hostname): global DEVICE_NUM global DEVICE_NUM_MAPPING if DEVICE_NUM_MAPPING.get(hostname, None): pass else: DEVICE_NUM += 1 DEVICE_NUM_MAPPING[hostname] = DEVICE_NUM def add_link(link): global GLOBAL_LINK if (link in GLOBAL_LINK) or ((link[1], link[0]) in GLOBAL_LINK): pass else: GLOBAL_LINK.append(link) def draw_initial(search_src_ip, target_ip): global SRC_HOST_IP global DST_HOST_IP global SRC_HOST_NAME global DST_HOST_NAME if search_src_ip == target_ip: add_device_num_mapping(search_src_ip) return False # todo 对接cmdb 查找源主机和目的主机信息,现在先在直接写死了; # todo 对输入进行合法性校验 SRC_HOST_NAME = 'src_host' DST_HOST_NAME = 'dst_host' SRC_HOST_IP = search_src_ip DST_HOST_IP = target_ip add_device_num_mapping(SRC_HOST_NAME) add_device_num_mapping(DST_HOST_NAME) SRC_SW_INFO = get_arp_switch(search_src_ip) global SRC_SW SRC_SW = SRC_SW_INFO[0] add_device_num_mapping(SRC_SW) DST_SW_INFO = get_arp_switch(target_ip) global DST_SW DST_SW = DST_SW_INFO[0] add_device_num_mapping(DST_SW) add_link((('src_host', 'eth0'), SRC_SW_INFO)) add_link((('dst_host', 'eth0'), DST_SW_INFO)) return True def get_iterable_obj_keys(iter): key_list = [] for key in iter: key_list.append(key) return key_list def draw(hostname, destination): # 感觉跟目录递归有点类似 if hostname == DST_SW: return True search_r = search_route(destination, hostname) for nexthop_iface in search_r[2]: remote_info = get_remote_port_neighbor(hostname, [nexthop_iface]) local_iface = get_iterable_obj_keys(remote_info)[0] add_device_num_mapping(remote_info[local_iface]['remote_device_name']) add_link( ( (hostname, local_iface), (remote_info[local_iface]['remote_device_name'], remote_info[local_iface]['remote_port']) ) ) draw(remote_info[local_iface]['remote_device_name'], destination) # # 接口名字适配 # interface_full_name_map = { 'Eth': 'Ethernet', 'Fa': 'FastEthernet', 'Gi': 'GigabitEthernet', 'Te': 'TenGigabitEthernet', } # 处理为长名字 def if_fullname(ifname): for k, v in interface_full_name_map.items(): if ifname.startswith(v): return ifname if ifname.startswith(k): return ifname.replace(k, v) return ifname # 处理为短名字 def if_shortname(ifname): for k, v in interface_full_name_map.items(): if ifname.startswith(v): return ifname.replace(v, k) return ifname # # 图标处理 # icon_capability_map = { 'router': 'router', 'switch': 'switch', 'bridge': 'switch', 'station': 'host' } icon_model_map = { 'CSR1000V': 'router', 'Nexus': 'switch', 'IOSXRv': 'router', 'IOSv': 'switch', '2901': 'router', '2911': 'router', '2921': 'router', '2951': 'router', '4321': 'router', '4331': 'router', '4351': 'router', '4421': 'router', '4431': 'router', '4451': 'router', '2960': 'switch', '3750': 'switch', '3850': 'switch', } def get_icon_type(device_cap_name, device_model=''): """ Device icon selection function. Selection order: - LLDP capabilities mapping. - Device model mapping. - Default 'unknown'. """ if device_cap_name: icon_type = icon_capability_map.get(device_cap_name) if icon_type: return icon_type if device_model: # Check substring presence in icon_model_map keys # string until the first match for model_shortname, icon_type in icon_model_map.items(): if model_shortname in device_model: return icon_type return 'unknown' # # 设备角色层级 # # 拓扑中的设备层级 NX_LAYER_SORT_ORDER = ( 'undefined', 'outside', 'edge_switch', 'edge_router', 'core_router', 'core_switch', 'distribution_router', 'distribution_switch', 'leaf', 'spine', 'access_switch' ) def get_node_layer_sort_preference(device_role): """ 获取设备的层级优先级,优先级是根据NX_LAYER_SORT_ORDER的索引确定的 """ for i, role in enumerate(NX_LAYER_SORT_ORDER, start=1): if device_role == role: return i return 1 # # 生成拓扑所需数据 # def generate_topology_json(): global DEVICE_NUM_MAPPING host_id_map = DEVICE_NUM_MAPPING topology_dict = {'nodes': [], 'links': []} for host in DEVICE_NUM_MAPPING: device_model = 'n/a' device_serial = 'n/a' device_ip = 'n/a' # 通过hostname 获取设备IP device_info = get_devevice_info_by_hostname(host) device_role = 'undefined' if device_info: device_model = device_info.model device_serial = device_info.serial_number device_ip = device_info.ip device_role = get_node_layer_sort_preference(device_info.role) if host == SRC_HOST_NAME: device_role = 'undefined' device_ip = SRC_HOST_IP if host == DST_HOST_NAME: device_role = 'undefined' device_ip = DST_HOST_IP topology_dict['nodes'].append({ 'id': DEVICE_NUM_MAPPING[host], 'name': host, 'primaryIP': device_ip, 'model': device_model, 'serial_number': device_serial, 'layerSortPreference': device_role, 'icon': 'router', # get_icon_type }) link_id = 0 for link in GLOBAL_LINK: topology_dict['links'].append({ 'id': link_id, 'source': host_id_map[link[0][0]], 'target': host_id_map[link[1][0]], 'srcIfName': if_shortname(link[0][1]), 'srcDevice': link[0][0], 'tgtIfName': if_shortname(link[1][1]), 'tgtDevice': link[1][0], }) link_id += 1 return topology_dict TOPOLOGY_FILE_HEAD = "\n\nvar topologyData = " def write_topology_file(topology_json, header=TOPOLOGY_FILE_HEAD, dst=OUTPUT_TOPOLOGY_FILENAME): with open(dst, 'w') as topology_file: topology_file.write(header) topology_file.write(json.dumps(topology_json, indent=4, sort_keys=True)) topology_file.write(';') def clean_global_var(): global GLOBAL_LINK GLOBAL_LINK = [] global DEVICE_NUM_MAPPING DEVICE_NUM_MAPPING = {} global DEVICE_NUM DEVICE_NUM = 0 def draw_topology(src_host_ip, dst_host_ip): # todo 其实需要对输入进行校验 initial_status = draw_initial(src_host_ip.strip(), dst_host_ip.strip()) if not initial_status: TOPOLOGY_DICT = generate_topology_json() write_topology_file(TOPOLOGY_DICT) clean_global_var() del TOPOLOGY_DICT return True else: draw(SRC_SW, DST_HOST_IP) TOPOLOGY_DICT = generate_topology_json() write_topology_file(TOPOLOGY_DICT) # 每次用完全局变量需要清空,否则会有问题 clean_global_var() del TOPOLOGY_DICT return True if __name__ == '__main__': draw_topology('57.1.1.7', '69.1.1.9')

3.最后会生成一个js文件(topology.js),类似如下截图:

下一篇文章介绍如何用nextui渲染拓扑。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:springboot如何通过session实现单点登入详解
下一篇:什么是SSL证书(ssl证书是什么意思啊)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~