更多滲透技能 ,公眾號:滲透師老A
作者:暗箭
腳本背景:
我剛開始寫的第一代代理池 單純為了練手,只具備批量爬取代理的功能,
后來我開始寫第二代,批量爬取加檢測代理否可用,將可用代理輸出,不可用代理pass,
這是根據我自身需求寫的,短短幾十行代碼清晰可辨,
很明顯,這有很多不足,
如果請求的代理原本是一個可用代理,但因為首次請求出現意外,導致請求失敗,那么一個明明可以為我們所用的代理,就這樣被我們錯過了,
不具備輪詢變動的功能,不具備配合工具來回跳動ip的功能……
為了滿足大家的需求,我開始參考網上大佬們的思路,潛心研究,通宵達旦,廢寢忘食,夜以繼日…………
ε=ε=ε=( ̄▽ ̄)經過幾十次除錯 終于寫出了第三代【豪華版】代理池,

【第三代代理池優點】:
1.輪詢變動IP
2.適者生存,不適者淘汰
3.異步處理,無需等待
4.sqlmap搭檔神器,IP跳來跳去
5.無需配置資料庫,WEB AIP輕松解決
好了,我不多BB,直接步入正題
實作代理池輪詢變動-從第一個字母開始敲起
一.代理池的實作條件:
(1).我們需要安裝幾個python中的庫,安裝起來很簡單,pip3 install “庫名”,
以下是需要用到的庫,其中包含系統自帶庫,大家可根據自身情況選擇安裝,(若缺少相應的模板,在腳本執行時會有報錯提示的,可根據提示補充安裝)
當然,有問題可以留言或私聊我
Redis,redis-dump
Pyquery
urllib
random
asyncio
aiohttp
botocore
multiprocessing
(2).安裝Redis-x64-3.0.504.msi,若不安裝則腳本運行時會報錯:Error 10061 connecting to 127.0.0.1:6379. 由于目標計算機積極拒絕,無法連接
報錯原因:Redis服務沒有啟動
安裝教程:github下載,下載速度極慢,需要搭梯子,考慮到本篇文章不能涉及翻墻,所以我將我搭梯子下載好的Redis-x64-3.0.504.msi放到下方的附件中,供大家安裝,

二.代理思路
(1).模塊創建
本次代理池全面升級,為了實作功能的多樣性,以及保證腳本的穩定性,可讀性,
我將創建6個模塊腳本,實作從代理池運行——》爬取代理——》存盤——》檢測——》介面——》呼叫等功能,
(2).實作代理維護
使用代理賦值法,將批量獲取的免費代理統一賦初始值為10,并存入資料庫中,通過檢測模塊向代理服務器發送請求,若首次請求成功,則將該代理初始值提升至100,若首次請求失敗則將初始值減1,若代理值減為0,則將代理從代理池中移除,
(3.)代理呼叫
通過WEB API介面,拿到隨機可用代理,根據我們為代理賦加的值,優先獲取最高值代理(值越高越穩定),若無最高值代理,則根據值的大小進行排名,優先輸出排名最靠前的代理,供我們使用,
三.代理撰寫
說實話……我真不能一點一點告訴你們每句代碼的意思,因為代碼太多了,所以保姆式教程在本篇文章中就不適用了,還望大家理解,但我一定能保證大家看得懂本篇文章,并且在看完本篇文章后,能夠獲得如何撰寫自己代理池的思路,
小白同學就要受點委屈了~不過我相信本篇文章對小白的提升一定是巨大的!!!
存盤模塊撰寫:
(1).將爬取的代理存盤到Redis資料庫中,通過定義一個類來操作Redis的有序集合,
(2).我們前面說過要為代理賦初始值,并且根據賦值大小進行排序,所以我們不得不呼叫Redis有序集合來滿足我們的要求,
什么是Redis有序集合:
Redis 有序集合和集合一樣也是string型別元素的集合,且不允許重復的成員,不同的是每個元素都會關聯一個double型別的分數,
redis正是通過分數來為集合中的成員進行從小到大的排序,
這里的double型別分數就是我們為代理賦的值,
現在我們要定義一個類,一些方法 和一些常量來實作代理的存盤,
創建db.py為存盤模塊: 下面內容需要我們對Redis庫的函式有所了解,要不然是看不懂的,
python
coding=gbk
存盤模塊
首先來定義一些常量:
python
MAX_SCORE = 100 #最大值
MIN_SCORE = 0 #最小值
INITIAL_SCORE = 10 #初始值
REDIS_HOST = 'localhost' #Redis連接IP
REDIS_PORT = 6379 #Redis連接埠
REDIS_PASSWORD = None #連接密碼,大家根據自己需求選擇
REDIS_KEY = 'proxies' #有序集合鍵名,獲取代理存盤使用的有序集合
呼叫庫并創建類和方法:
python
import redis #實作Redis的連接及使用
from random import choice #回傳一個串列,元組或字串的隨機項
class RedisClient(object):
def __init__(self, host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD):
"""
初始化
:param host: Redis 地址
:param port: Redis 埠
:param password: Redis密碼
"""
self.db = redis.StrictRedis(host=host, port=port, password=password, decode_responses=True)
這里有個小細節:self.db是定義一個實體,用來連接資料庫的,里面有一個decode_responses引數值為True 這是因為當我們要求回傳鍵值時,若不加此條件則會回傳——b’Value’ ,回傳結果會有一個跟屁蟲b,b代表為byte資料型別,所以當添加decode_responses=True時,回傳結果就不會有跟屁蟲了——’Valie’
python
def add(self, proxy, score=INITIAL_SCORE):
"""
添加代理,設定分數為最高
:param proxy: 代理
:param score: 分數
:return: 添加結果
"""
if not self.db.zscore(REDIS_KEY, proxy):
return self.db.zadd(REDIS_KEY, score, proxy)
add()實作為批量獲取的代理附加初始值10,
def random(self):
"""
隨機獲取有效代理,首先嘗試獲取最高分數代理,如果不存在,按照排名獲取,否則例外
:return: 隨機代理
"""
result = self.db.zrangebyscore(REDIS_KEY, MAX_SCORE, MAX_SCORE)
if len(result):
return choice(result)
else:
result = self.db.zrevrange(REDIS_KEY, 0, 100)
if len(result):
return choice(result)
else:
raise PoolEmptyError
第一個result包含所有值為100的代理(最高效代理),第二個result就是矬子里拔將軍,但也很不錯的,
def decrease(self, proxy):
"""
代理值減一分,小于最小值則洗掉
:param proxy: 代理
:return: 修改后的代理分數
"""
score = self.db.zscore(REDIS_KEY, proxy)
if score and score > MIN_SCORE:
print('代理', proxy, '當前分數', score, '減1')
return self.db.zincrby(REDIS_KEY, proxy, -1)
else:
print('代理', proxy, '當前分數', score, '移除')
return self.db.zrem(REDIS_KEY, proxy)
Redis基礎語法不講了,實作無用代理移除功能
def exists(self, proxy):
"""
判斷是否存在
:param proxy: 代理
:return: 是否存在
"""
return not self.db.zscore(REDIS_KEY, proxy) == None
#簡略寫法,當代理池無代理時 回傳not
def max(self, proxy):
"""
將代理設定為MAX_SCORE
:param proxy: 代理
:return: 設定結果
"""
print('代理', proxy, '可用,設定為', MAX_SCORE)
return self.db.zadd(REDIS_KEY, MAX_SCORE, proxy)
def count(self):
"""
獲取數量
:return: 數量
"""
return self.db.zcard(REDIS_KEY)
def all(self):
"""
獲取全部代理
:return: 全部代理串列
"""
return self.db.zrangebyscore(REDIS_KEY, MIN_SCORE, MAX_SCORE)
創建crawler.py為獲取模塊:
從各大網站批量爬去代理,比較簡單 不做過多說明 以前文章有涉及過爬蟲的原理,
呼叫庫及函式
from pyquery import PyQuery as pq #決議
import urllib.request #請求
創建一個類,
class ProxyMetaclass(type):
def __new__(cls, name, bases, attrs):
count = 0
attrs['__CrawlFunc__'] = []
for k, v in attrs.items():
if 'crawl_' in k:
attrs['__CrawlFunc__'].append(k)
count += 1
attrs['__CrawlFuncCount__'] = count
return type.__new__(cls, name, bases, attrs)
class Crawler(object, metaclass=ProxyMetaclass):
def get_proxies(self, callback): #callback = crawl_daili66 就是下面定義的獲取代理的方法名稱
proxies = []
for proxy in eval("self.{}()".format(callback)):
print('成功獲取到代理', proxy)
proxies.append(proxy)
return proxies
其實這里借助于元類來實作【kk……對于元類我了解的也不深】,
這是attrs字典形式 K為鍵 V為值,當中包含我們定義的方法名稱 如圖:

解釋代碼:
定義了一個 ProxyMetaclass,Crawl 類將它設定為元類,元類中實作了 new() 方法,這個方法有固定的幾個引數,其中第四個引數 attrs 中包含了類的一些屬性,這其中就包含了類中方法的一些資訊,我們可以遍歷 attrs 這個變數即可獲取類的所有方法資訊,
所以在這里我們在 new() 方法中遍歷了 attrs 的這個屬性,就像遍歷一個字典一樣,鍵名對應的就是方法的名稱,接下來判斷其開頭是否是 crawl_,
如果是,則將其加入到 CrawlFunc 屬性中,這樣我們就成功將所有以 crawl 開頭的方法定義成了一個屬性,就成功動態地獲取到所有以 crawl 開頭的方法串列了,
下一步定義方法:主要是用爬蟲來爬取代理:
這里我們要記住,為了實作多方法爬取不同代理網站的想法,我們必須統一規定被定義的方法以”crawl_”開頭
def crawl_daili66(self,page_count=4):
start_url = 'http://www.66ip.cn/{}.html'
urls = [start_url.format (page) for page in range(1, page_count + 1)]
for url in urls:
print('Crawling', url)
req = urllib.request.Request(url=url)
res = urllib.request.urlopen(req)
html = res.read()
if html:
doc = pq(html)
trs = doc('.containerbox table tr:gt(0)').items()
for tr in trs:
ip = tr.find('td:nth-child(1)').text()
port = tr.find('td:nth-child(2)').text()
yield ':' .join([ip,port])
def crawl_proxyXH(self):
start_url = "http://www.89ip.cn/index_{}.html"
urls = [start_url.format(page) for page in range(1,10)]
headers = {'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:23.0) Gecko/20100101 Firefox/23.0'}
for url in urls:
req = urllib.request.Request(url=url,headers=headers)
res = urllib.request.urlopen(req)
html = res.read()
if html:
doc = pq(html)
trs = doc('.layui-row table tr:gt(0)').items()
for tr in trs:
ip = tr.find('td:nth-child(1)').text() #tr節點下第一個td子節點的文本內容
port = tr.find('td:nth-child(2)').text()
yield ':' .join([ip,port])
別看上面代碼這么長,耐心看一眼就能看懂,
唯一要點一下的一處是,這里的tr:gt(0)容易理解錯,這里使用gt(0),并不是說包含了從第一個tr節點到最后一個tr節點的所有節點,而是包含首個tr節點的下一個兄弟節點及其后的所有tr節點,
換句話說,就是除了第一個tr節點外 其他的tr節點都包含其中
我找了兩個比較好用且訪問速度比較快的網站,之所以拋棄”小幻代理”是因為容易出現訪問延遲的現象,影響腳本的穩定性,
第二個代理網站需要設定請求頭的,要不然會禁止訪問,
爬蟲方法是如何運行的見下圖,以66ip為例:

逐層鎖定,遍歷tr節點 篩選子節點 都是爬蟲基礎,不過多解釋,不懂可以私下問我,
我們還需要創建一個getter.py的模塊,用來動態呼叫所有以crawl_開頭的方法,然后抓取代理,存盤到資料庫中,
getter.py模塊:
因為涉及到獲取和存盤 所以直接從先前兩個模塊中呼叫已經創建好的兩個類
from db import RedisClient
from crawler import Crawler
限制代理池的最大容存量為10000
POOL_UPPER_THRESHOLD = 10000
定義Getter類,創建實體,為了呼叫其中類的函式
class Getter():
def __init__(self):
self.redis = RedisClient()
self.crawler = Crawler()
def is_over_threshold(self):
"""
呼叫RedisClient中count()函式
判斷是否達到了代理池限制
"""
if self.redis.count() >= POOL_UPPER_THRESHOLD:
return True
else:
return False
def run(self):
print('獲取器開始執行')
if not self.is_over_threshold():
for callback_label in range(self.crawler.__CrawlFuncCount__): #從串列獲取所有包含crawl_的方法
callback = self.crawler.__CrawlFunc__[callback_label]
proxies = self.crawler.get_proxies(callback) #承接獲取模塊中的函式,獲取代理
for proxy in proxies:
self.redis.add(proxy) #存盤到redis資料庫中
定義了 is_over_threshold() 方法判斷代理池是否已經達到了容量閾值,它就是呼叫了 RedisClient 的 count() 方法獲取代理的數量,然后加以判斷,如果數量達到閾值則回傳 True,否則 False,
如果不想加這個限制可以將此方法永久回傳 True,
接下來定義了 run() 方法,首先判斷了代理池是否達到閾值,然后在這里就呼叫了 Crawler 類的 CrawlFunc 屬性,獲取到所有以 crawl 開頭的方法串列,依次通過 get_proxies() 方法呼叫,得到各個方法抓取到的代理,然后再利用 RedisClient 的 add() 方法加入資料庫
創建tester.py為檢測模塊:
在上述db.py crawler.py getter.py三個模塊中,我們已經能夠成功獲取代理并且將其放入資料庫中,
然后就需要一個檢測模塊來對所有的代理進行一輪輪的檢測,檢測可用就設定為 100,不可用就分數減 1,這樣就可以實時改變每個代理的可用情況,在獲取有效代理的時候只需要獲取分數高的代理即可,
由于代理的數量非常多,為了提高代理的檢測效率,我們在這里使用異步請求庫 Aiohttp 來進行檢測,
為什么要用Aiohttp呢,來回想一下,我們在請求單個網址的時候通常習慣使用requests來請求,而Requests 作為一個同步請求庫,我們在發出一個請求之后需要等待網頁加載完成之后才能繼續執行程式,
也就是這個程序會阻塞在等待回應這個程序,如果服務器回應非常慢,比如一個請求等待十幾秒,那么我們使用 Requests 完成一個請求就會需要十幾秒的時間,中間其實就是一個等待回應的程序,程式也不會繼續往下執行,
而Aiohttp異步請求庫便完美的解決了這個問題,在請求發出之后,程式可以繼續接下去執行去做其他的事情,當回應到達時,會通知程式再去處理這個回應,這樣程式就沒有被阻塞,充分把時間和資源利用起來,大大提高效率,
所以在這里我們的代理檢測使用異步請求庫 Aiohttp,實作示例如下:
VALID_STATUS_CODES = [200]
TEST_URL = 'http://www.baidu.com'
BATCH_TEST_SIZE = 100
設定好狀態碼200為目標服務器已經處理了請求,
BATCH_TEST_SIZE設定好一次檢測的最大代理量,這里一次最多檢測100個代理
TEST_URL:使用該網站進行檢測,可以設定為一個不會封 IP 的網站,百度就很不錯哦~
from db import RedisClient
import asyncio #用來撰寫 并發 代碼的庫
import aiohttp
import time
init() 方法中建立了一個 RedisClient 物件,供類中其他方法使用
class Tester(object):
def __init__(self):
self.redis = RedisClient()
接下來定義了一個 test_single_proxy() 方法,用來檢測單個代理的可用情況,其引數就是被檢測的代理,
async def test_single_proxy(self, proxy):
conn = aiohttp.TCPConnector(verify_ssl=False) #用于使用TCP處理HTTP和HTTPS的連接器
async with aiohttp.ClientSession(connector=conn) as session:
try:
if isinstance(proxy, bytes):
proxy = proxy.decode('utf-8')
real_proxy = 'http://' + proxy
print('正在測驗', proxy)
async with session.get(TEST_URL, proxy=real_proxy, timeout=15) as response: #呼叫get()請求代理
if response.status in VALID_STATUS_CODES:
self.redis.max(proxy)
print('代理可用', proxy)
else:
self.redis.decrease(proxy)
print('請求回應碼不合法', proxy)
except :
self.redis.decrease(proxy)
print('代理請求失敗', proxy)
注意這個方法前面加了 async 關鍵詞,代表這個方法是異步的,方法內部首先創建了 Aiohttp 的 ClientSession 物件,此物件類似于 Requests 的 Session 物件,可以直接呼叫該物件的 get() 方法來訪問頁面,
在這里代理的設定方式是通過 proxy 引數傳遞給 get() 方法,請求方法前面也需要加上 async 關鍵詞標明是異步請求,這也是 Aiohttp 使用時的常見寫法,
def run(self):
print('測驗器開始運行')
try:
proxies = self.redis.all() # 所有的代理
loop = asyncio.get_event_loop() #事件回圈的獲取
# 批量測驗
for i in range(0, len(proxies), BATCH_TEST_SIZE):
test_proxies = proxies[i:i + BATCH_TEST_SIZE]
tasks = [self.test_single_proxy(proxy) for proxy in test_proxies]
loop.run_until_complete(asyncio.wait(tasks))
time.sleep(5)
except Exception as e:
print('測驗器發生錯誤', e.args)
Aiohttp語法我不了解,這部分直接扒書上的代碼用,
創建api.py介面模塊:
提到介面大家想到的肯定就是API,那么這里為什么不使用其他資料庫,比如上面的Redis資料庫,而是呼叫WEB API來實作介面模塊的運行呢?
(1).資料庫密碼泄露風險
(2).為了遠程連接代理池
(3).便于同步更新
這樣獲取代理只需要請求一下介面即可,以上的幾個缺點弊端可以解決,
我們在這里使用一個比較輕量級的庫 Flask 來實作這個介面模塊,實作示例如下:
from flask import Flask, g #呼叫flask庫
from db import RedisClient #呼叫類
這是個小知識點all是個變數串列,我們看到all等于[‘app’]意思就是說 ,在本模塊中,若不參考該模塊,則只允許執行app函式-> Flask(name)
__all__ = ['app']
app = Flask(__name__)
初始化:所有的Flask都必須創建程式實體,
web服務器使用wsgi協議,把客戶端所有的請求都轉發給這個程式實體
程式實體是Flask的物件,一般情況下用如下方法實體化
Flask類只有一個必須指定的引數,即程式主模塊或者包的名字,name是系統變數,該變數指的是本py檔案的檔案名
def get_conn():
if not hasattr(g, 'redis'):
g.redis = RedisClient()
return g.redis
@app.route('/')
def index():
return '<h2>Welcome to Proxy Pool System</h2>'
@app.route('/random')
def get_proxy():
conn = get_conn()
return conn.random()
@app.route('/count')
def get_counts():
conn = get_conn()
return str(conn.count())
客戶端發送url給web服務器,web服務器將url轉發給flask程式實體,程式實體
需要知道對于每一個url請求啟動那一部分代碼,所以保存了一個url和python函式的映射關系,
處理url和函式之間關系的程式,稱為路由
在flask中,定義路由最簡便的方式,是使用程式實體的app.route裝飾器,把裝飾的函式注冊為路由
if __name__ == '__main__':
app.run()
最后一個模塊 我們稱之為調度模塊,
創建run.py調度模塊:
這個模塊其實就是呼叫以上所定義的三個模塊,將以上三個模塊通過多行程的形式運行起來,示例如下:
TESTER_CYCLE = 20
GETTER_CYCLE = 20
TESTER_ENABLED = True
GETTER_ENABLED = True
API_ENABLED = True
在這里還有三個常量,TESTER_ENABLED、GETTER_ENABLED、API_ENABLED 都是布爾型別,True 或者 False,標明了測驗模塊、獲取模塊、介面模塊的開關,如果為 True,則代表模塊開啟,
import time
from multiprocessing import Process
from api import app #呼叫介面模塊
from getter import Getter #呼叫獲取模塊(2) crawler.py與getter皆為獲取模塊
from tester import Tester #呼叫檢測模塊
三個調度方法結構也非常清晰,比如 schedule_tester() 方法,這是用來調度測驗模塊的方法,首先宣告一個 Tester 物件,然后進入死回圈不斷回圈呼叫其 run() 方法,執行完一輪之后就休眠一段時間,休眠結束之后重新再執行,在這里休眠時間也定義為一個常量,如 20 秒,這樣就會每隔 20 秒進行一次代理檢測,
class Scheduler():
def schedule_tester(self, cycle=TESTER_CYCLE):
tester = Tester()
while True:
print('測驗器開始運行')
tester.run()
time.sleep(cycle)
#每隔20秒從資料庫獲取一次代理
def schedule_getter(self, cycle=GETTER_CYCLE):
getter = Getter()
while True:
print('開始抓取代理')
getter.run()
time.sleep(cycle)
def schedule_api(self):
app.run('127.0.0.1','5000') #這里要看分配,我這兒分配的是5000埠也就是AIP_PORT,這個埠怎么看,大家可以直接執行aip.py模塊

啟動入口是 run() 方法,其分別判斷了三個模塊的開關,如果開啟的話,就新建一個 Process 行程,設定好啟動目標,然后呼叫 start() 方法運行,這樣三個行程就可以并行執行,互不干擾,
def run(self):
print('代理池開始運行')
if TESTER_ENABLED:
tester_process = Process(target=self.schedule_tester)
tester_process.start()
if GETTER_ENABLED:
getter_process = Process(target=self.schedule_getter)
getter_process.start()
if API_ENABLED:
api_process = Process(target=self.schedule_api)
api_process.start()
運行的時候要這樣寫:
if __name__=='__main__':
Scheduler().run()
否則就會出現一個報錯提示

四.代理池的運行
(1).首先要保證Redis服務開啟,若沒開啟的話請下載并安裝Redis-x64-3.0.504.msi
安裝包已添加到附件中,點開直接無腦‘下一步‘,安裝成功后,找到.exe檔案所在目錄,雙擊執行即可,
Port:6379別動就行,腳本里面已經提前寫好了,

(2).回到run.py腳本 直接執行(保證所有模塊均在同一目錄下)
(3).執行效果:如圖




可見免費代理能用的還是很少的……所以還是建議有條件的同學取購買付費代理使用

以上就是代理池創建到實作的全部程序了,
有同學會問,我該怎樣將其輪詢變動到工具中,
比如,我用sqlmap掃描目標網址總是被封,該怎么利用腳本呢?
方法:sqlmap -u 目標網址 –porxy = http://127.0.0.1:5000/random
即可實作每掃描一次就,變動一次我們掃描器的IP地址,是不是很牛掰的樣子,
*原始碼及Redis安裝包統一已上傳博客的“下載資源處”,可自行下載


轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/310506.html
標籤:其他
