Python 简易端口扫描

使用队列 多线程方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import sys
sys.path.append("..")

import threading,socket,queue
import time

lock = threading.Lock() # 线程锁

# 扫描端口队列
def GetQueue(host):
PortQueue = queue.Queue()
for port in range(1,65535):
PortQueue.put((host,port))
return PortQueue

class ScanThread(threading.Thread):
def __init__(self,SingleQueue,outip):
threading.Thread.__init__(self)
self.daemon = True
# self.setDaemon(True)
self.SingleQueue = SingleQueue
self.outip = outip
def ping(self,scanIP,port):
global OpenPort,Lock
sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sock.settimeout(1)
address = (scanIP,port)
try:
sock.connect(address)
except:
sock.close()
return False
sock.close()
print(scanIP, port)
if lock.acquire():
lock.release()
return True
def run(self):
while not self.SingleQueue.empty():
# print(self.SingleQueue.get())
host,port = self.SingleQueue.get()
self.ping(host,port)

class Work():
def __init__(self,scan_id = '',scan_target='',scan_type = '',scan_args='',back_fn=None):
self.scan_id = scan_id
self.target = scan_target
self.scan_type = scan_type
self.args = scan_args
self.back_fn = back_fn
self.result= []
def run(self):
ThreadList = []
SingLeQueue = GetQueue(self.target)
resultQueue = queue.Queue()
for i in range(0,200):
t = ScanThread(SingLeQueue,resultQueue)
ThreadList.append(t)
for t in ThreadList:
t.start()
for t in ThreadList:
t.join()
start_time = time.time()
t = Work(scan_target = "111.7.163.233")
t.run()
print('%d second' % (time.time() - start_time))

执行完成63S

使用协程的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import gevent
from gevent import monkey
from gevent.pool import Pool
monkey.patch_all()
import socket,time

class work():
def __init__(self,scan_target = ""):
self.pool = Pool(200)
self.timeout=0.1
self.target = scan_target
def port_scan(self,port):
sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sock.settimeout(self.timeout)
try:
sock.connect((self.target,port))
print(self.target,":",port)
except:
sock.close()
def run(self):
res = []
for port in range(1,65535):
res.append(port)
self.pool.map(self.port_scan,res)

start_time = time.time()
t = work('127.0.0.1')
t.run()
print('%d second' % (time.time() - start_time))

扫描完成需155S

-------------本文结束感谢您的阅读-------------