Python 简单的子域名存活判断

通过threading.Semaphore控制最大线程数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import requests
import threading
# 线程最大数量
sem=threading.Semaphore(100)
cunhua = []
# 访问URL,判断是否存活
def URLIP(url):
sem.acquire()
port = [80,443]
for i in port:
url1 = "http://%s:%s"%(url,i)
try:
r = requests.get(url1,timeout=4)
print(url1 + ":" + "存活")
cunhua.append(url1)
break
except Exception as e:
# print(e)
pass
finally:
sem.release()
if __name__ == '__main__':
domain = []
threads = []
with open('test3.txt','r') as f:
for i in f.readlines():
t = threading.Thread(target=URLIP,args=(i.strip(),))
threads.append(t)
f.close()
for i in range(0,len(threads)):
threads[i].start()
for i in range(0,len(threads)):
threads[i].join()
with open('test4.txt','w+') as f:
for i in cunhua:
f.write(i.strip() + '\n')
f.close()

过时的threadpool模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import requests
import time
import threadpool
domain_sucess = []
def URLIP(url):
port = [80,443]
for i in port:
url1 = "http://%s:%s"%(url,i)
try:
r = requests.get(url1,timeout=4)
print(url1 + ":" + "存活")
domain_sucess.append(url1)
break
except Exception as e:
pass
finally:
pass
if __name__ == '__main__':
domain = []
with open('test3.txt','r') as f:
for i in f.readlines():
domain.append(i.strip())
f.close()
start_time = time.time()
pool = threadpool.ThreadPool(100)
requests_1 = threadpool.makeRequests(URLIP,domain)
[pool.putRequest(req) for req in requests_1]
pool.wait()
print('%d second' % (time.time() - start_time))
print(domain_sucess)

multiprocessing进程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import requests
import time
import multiprocessing
domain_sucess = []
def URLIP(url):
port = [80,443]
for i in port:
url1 = "http://%s:%s"%(url,i)
try:
r = requests.get(url1,timeout=4)
print(url1 + ":" + "存活")
domain_sucess.append(url1)
break
except Exception as e:
pass
finally:
pass
if __name__ == '__main__':
domain = []
start_time = time.time()
pool = multiprocessing.Pool(100)
with open('test3.txt','r') as f:
for i in f.readlines():
pool.apply_async(URLIP, (i.strip(),))
f.close()
pool.close()
pool.join()
print('%d second' % (time.time() - start_time))
print(domain_sucess)

asyncio or aiohttp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import asyncio
import aiohttp
import time
cunhuadomain = []
async def URLCUNHUA(url):
ports = [80,443]
for port in ports:
url1 = 'http://' + url + ":" + str(port)
try:
async with aiohttp.ClientSession() as session:
async with session.get(url1) as r:
cunhuadomain.append(url1)
except Exception as e:
pass



if __name__ == '__main__':
start = time.time()
tasks = []
with open('test3.txt','r') as f:
for i in f.readlines():
tasks.append(URLCUNHUA(i.strip()))
f.close()
event_loop = asyncio.get_event_loop()
results = event_loop.run_until_complete(asyncio.gather(*tasks))
event_loop.close()
with open('test4.txt','w+') as f:
for i in cunhuadomain:
f.write(i.strip + '\n')
end = time.time()
print(end - start)
-------------本文结束感谢您的阅读-------------