Python 简易端口扫描 TEST 发表于 2019-04-12 | 分类于 Python | 评论数: 热度: ℃ | 本文字数: 2.9k 字 | 阅读时长 ≈ 3 分钟 使用队列 多线程方法 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566import syssys.path.append("..")import threading,socket,queueimport timelock = threading.Lock() # 线程锁# 扫描端口队列def GetQueue(host): PortQueue = queue.Queue() for port in range(1,65535): PortQueue.put((host,port)) return PortQueueclass ScanThread(threading.Thread): def __init__(self,SingleQueue,outip): threading.Thread.__init__(self) self.daemon = True # self.setDaemon(True) self.SingleQueue = SingleQueue self.outip = outip def ping(self,scanIP,port): global OpenPort,Lock sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) sock.settimeout(1) address = (scanIP,port) try: sock.connect(address) except: sock.close() return False sock.close() print(scanIP, port) if lock.acquire(): lock.release() return True def run(self): while not self.SingleQueue.empty(): # print(self.SingleQueue.get()) host,port = self.SingleQueue.get() self.ping(host,port)class Work(): def __init__(self,scan_id = '',scan_target='',scan_type = '',scan_args='',back_fn=None): self.scan_id = scan_id self.target = scan_target self.scan_type = scan_type self.args = scan_args self.back_fn = back_fn self.result= [] def run(self): ThreadList = [] SingLeQueue = GetQueue(self.target) resultQueue = queue.Queue() for i in range(0,200): t = ScanThread(SingLeQueue,resultQueue) ThreadList.append(t) for t in ThreadList: t.start() for t in ThreadList: t.join()start_time = time.time()t = Work(scan_target = "111.7.163.233")t.run()print('%d second' % (time.time() - start_time)) 执行完成63S 使用协程的方式 1234567891011121314151617181920212223242526272829import geventfrom gevent import monkeyfrom gevent.pool import Poolmonkey.patch_all()import socket,timeclass work(): def __init__(self,scan_target = ""): self.pool = Pool(200) self.timeout=0.1 self.target = scan_target def port_scan(self,port): sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) sock.settimeout(self.timeout) try: sock.connect((self.target,port)) print(self.target,":",port) except: sock.close() def run(self): res = [] for port in range(1,65535): res.append(port) self.pool.map(self.port_scan,res)start_time = time.time()t = work('127.0.0.1')t.run()print('%d second' % (time.time() - start_time)) 扫描完成需155S -------------本文结束感谢您的阅读------------- 打赏 微信支付