代理指定ip用Python实现快速Ping一个IP网段地址!

原文链接:www.yjsec.com/2020/11/07

ping 代理指定ip命令是我们检查网络中最常用的命令,作为网络人员,基本上每天都会用到,可以很好地帮助我们分析和判定网络故障;

如果有 10 设备,100 台设备,1000 台设备怎么办?一个个 ping 过去人都要疯掉了,这种情况在大型网络中我们有可能遇到,那怎么办呢?我们今天来看下如何用 python 来实现批量 ping 测试主机。直接上代码:#!/usr/bin/python3# -*- coding: utf-8 -*-import osimport argparseimport socketimport structimport selectimport timeICMP_ECHO_REQUEST = 8 # Platform specificDEFAULT_TIMEOUT = 0.1DEFAULT_COUNT = 4class Pinger(object):    """ Pings to a host -- the Pythonic way"""    def __init__(self, target_host, count=DEFAULT_COUNT, timeout=DEFAULT_TIMEOUT):        self.target_host = target_host        self.count = count        self.timeout = timeout    def do_checksum(self, source_string):        """  Verify the packet integritity """        sum = 0        max_count = (len(source_string)/2)*2        count = 0        while count < max_count:            val = source_string[count + 1]*256 + source_string[count]            sum = sum + val            sum = sum & 0xffffffff            count = count + 2        if max_count<len(source_string):            sum = sum + ord(source_string[len(source_string) - 1])            sum = sum & 0xffffffff        sum = (sum >> 16)  +  (sum & 0xffff)        sum = sum + (sum >> 16)        answer = ~sum        answer = answer & 0xffff        answer = answer >> 8 | (answer << 8 & 0xff00)        return answer    def receive_pong(self, sock, ID, timeout):        """        Receive ping from the socket.        """        time_remaining = timeout        while True:            start_time = time.time()            readable = select.select([sock], [], [], time_remaining)            time_spent = (time.time() - start_time)            if readable[0] == []: # Timeout                return            time_received = time.time()            recv_packet, addr = sock.recvfrom(1024)            icmp_header = recv_packet[20:28]            type, code, checksum, packet_ID, sequence = struct.unpack(       "bbHHh", icmp_header   )            if packet_ID == ID:                bytes_In_double = struct.calcsize("d")                time_sent = struct.unpack("d", recv_packet[28:28 + bytes_In_double])[0]                return time_received - time_sent            time_remaining = time_remaining - time_spent            if time_remaining <= 0:                return    def send_ping(self, sock,  ID):        """        Send ping to the target host        """        target_addr  =  socket.gethostbyname(self.target_host)        my_checksum = 0        # Create a dummy heder with a 0 checksum.        header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1)        bytes_In_double = struct.calcsize("d")        data = (192 - bytes_In_double) * "Q"        data = struct.pack("d", time.time()) + bytes(data.encode(utf-8))        # Get the checksum on the data and the dummy header.        my_checksum = self.do_checksum(header + data)        header = struct.pack(      "bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1  )        packet = header + data        sock.sendto(packet, (target_addr, 1))    def ping_once(self):        """        Returns the delay (in seconds) or none on timeout.        """        icmp = socket.getprotobyname("icmp")        try:            sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)        except socket.error as e:            if e.errno == 1:                # Not superuser, so operation not permitted                e.msg +=  "ICMP messages can only be sent from root user processes"                raise socket.error(e.msg)        except Exception as e:            print("Exception: %s" %(e))        my_ID = os.getpid() & 0xFFFF        self.send_ping(sock, my_ID)        delay = self.receive_pong(sock, my_ID, self.timeout)        sock.close()        return delay    def ping(self):        """        Run the ping process        """        for i in range(self.count):            print ("Ping to %s..." % self.target_host,)            try:                delay  =  self.ping_once()            except socket.gaierror as e:                print ("Ping failed. (socket error: %s)" % e[1])                break            if delay  ==  None:                print ("Ping failed. (timeout within %ssec.)" % self.timeout)            else:                delay = delay * 1000                print("Get pong in %0.4fms" % delay)if __name__ == __main__:    alive = []    host_prefix = 192.168.242.    for i in range(1, 255):        host = host_prefix + str(i)        pinger = Pinger(target_host=host)        delay = pinger.ping_once()        if delay == None:            print("Ping %s 失败,超时2秒" % host)        else:            print("ping %s = %s ms" % (host, round(delay * 1000, 4)))            alive.append(host)        # time.sleep(0.5)

测试如下:

后台回复“加群”,带你进入高手如云交流群

推荐阅读:

深入理解DPDK程序设计|Linux网络2.0

一文读懂基于Kubernetes打造的边缘计算

网络方案 Cilium 入门教程

聊聊非阻塞I/O编程

Linux内核调度器源码分析

Docker  代理指定ip容器技术使用指南

Linux下的一些资源限制

Linux 系统安全强化指南

云原生/云计算发展白皮书(附下载)

Linux 常用监控指标总结

Kubernetes 集群网络从懵圈到熟悉

使用 GDB+Qemu 调试 Linux 内核

防火墙双机热备

常见的几种网络故障案例分析与解决

Kubernetes容器之间的通信浅谈

kube-proxy 如何与 iptables 配合使用

完美排查入侵者

Kubernetes 万字实战教程 (2021最新版)

Kubernetes 常见问题总结

一文详解负载均衡和反向代理的真实区别

TCP协议灵魂 12 问,总会用得到

QUIC也不是万能的

超详干货!Linux环境变量配置全攻略

为什么要选择智能网卡?

网络排错大讲解~

OVS 和 OVS-DPDK 对比

微软出品的最新K8S学习指南3.0下载

▼喜欢,就给我一个“在看”

10T 技术资源大放送!包括但不限于:云计算、虚拟化、微服务、大数据、网络、Linux、Docker、Kubernetes、Python、Go、C/C++、Shell、PPT 等。在内回复「1024」,即可免费获取!!