[Python100行系列]-多IP地址端口扫描工具[协程+进程]

[Python100行系列]-多IP地址扫描端口[协程+进程]

Posted by Hzy on February 7, 2020

Hzy的博客

完整代码

今天试试用协程+进程来写一个简单的局域网ip扫描工具。

1.首先需要一个进程池,这里使用的是multiprocessing.pool

1.把要扫描的ip均匀分割,然后启动一个进程任务。

1
2
3
4
5
6
7
8
def run(self):
    len_split = 1 if len(self.ip_list) // 8 <= 1 else len(self.ip_list)//8
    split_ip_lists = [self.ip_list[i:i + len_split] for i in range(0, len(self.ip_list), len_split)]
    with pool.Pool(8) as p:
        for split_ip_list in split_ip_lists:
            p.apply_async(self.process_start_task, args=(split_ip_list,))
        p.close()
        p.join()

2. 然后在写,进程里需要执行的任务。

1
2
3
4
5
6
7
def process_start_task(self, ip_list):
    loop = asyncio.get_event_loop()
    tasks = []
    for ip in ip_list:
        task = asyncio.ensure_future(self.Task(i)
        tasks.append(task)
    loop.run_until_complete(asyncio.wait(tasks))

进程里:

  • 1.启动一个协程的事件循环
  • 2.添加任务到循环中
  • 3.等待任务完成

3. 协程中执行的任务

1
2
3
async def Task(self, ip, arguments="-Pn"ports=None):
    result = self.nm.scan(ip, arguments=arguments)
    print(datetime.now(), result)

这里我用的python-nmap来进行扫描,当然也可以自己写个,比如使用ping

1
2
3
4
5
6
7
8
9
    fnull = open(os.devnull, 'w')
    ipaddr = 'ping ' + str(ip)
    result = subprocess.call(ipaddr + ' -n 2', shell=True, stdout=fnull, stderr=fnull)
    current_time = time.strftime('%Y%m%d-%H:%M:%S', time.localtime())
    if result:
        print('时间:{} ip地址:{} ping fall'.format(current_time, ipaddr))
    else:
        print('时间:{} ip地址:{} ping ok'.format(current_time, ipaddr))
    fnull.close()

4 最后看下完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import asyncio
from multiprocessing import Process, pool, Manager
from nmap import nmap
from datetime import datetime
import time


class Scan():
    def __init__(self, ip_list=None):
        self.nm = nmap.PortScanner()
        self.ip_list = ip_list if ip_list else ["127.0.0.1"]
        self.results = []
    async def Task(self, ip, arguments="-Pn", ports=None):
        result = self.nm.scan(ip, arguments=arguments)
        print(datetime.now(), result)

    def process_start_task(self, ip_list):
        loop = asyncio.get_event_loop()
        tasks = []
        for ip in ip_list:
            task = asyncio.ensure_future(self.Task(ip))
            tasks.append(task)
        loop.run_until_complete(asyncio.wait(tasks))

    def run(self):
        len_split = 1 if len(self.ip_list) // 8 <= 1 else len(self.ip_list)//8
        split_ip_lists = [self.ip_list[i:i + len_split] for i in range(0, len(self.ip_list), len_split)]
        with pool.Pool(8) as p:
            for split_ip_list in split_ip_lists:
                p.apply_async(self.process_start_task, args=(split_ip_list,))
            p.close()
            p.join()

if __name__ == '__main__':
    s1 = time.time()
    ip_list = ["192.168.137." + str(i) for i in range(1, 256)]
    s = Scan(ip_list)
    s.run()
    print(time.time()-s1)

感觉速度还是很慢…这两天在研究下!