代理的使用
- 代理池的維護
- 代理池
- 付費代理
- ADSL撥號代理
代理池的維護
基本模塊:
- 存盤模塊: 使用Redis的有序集合負責存盤抓取下來的代理,保證代理不重復
- 獲取模塊:需要定時在各大代理網站抓取代理,代理的形式都是IP加埠,盡量從不同來源獲取,盡量抓取高匿代理,
- 檢測模塊:需要定時檢測資料庫中的代理,設定一個檢測鏈接,最好是爬取哪個網站就檢測哪個網站
- 介面模塊:需要用API來提供對外服務的介面,由于可用代理可能有多個,那么我們可以設定一個隨機回傳某個可用代理的介面,這樣就能保證每個可用代理都可以取到實作負載均衡
代理池
存盤模塊:使用redis的有序集合,有序集合的每個元素都有一個分數欄位,分數是可以重復的,根據分數對集合進行排序,可以作為判斷一個代理是否可用的標志,
操作資料庫的有序集合
MAX_SCORE=100
MIN_SCORE=0
INITIAL_SCORE=10
REDIS_HOST='localhost'
REDIS_PORT=6379
REDIS_PADDWORD=None
REDIS_KEY='proxies'
import redis
from random import choice
class RedisClient(object):
def __init__(self,host=REDIS_HOST,port=REDIS_PORT,password=REDIS_PADDWORD):
"""
初始化
:param host:Redis 地址
:param port: Redis 埠
:param password: Redis 密碼
"""
self.db=redis.StrictRedis(host=host,port=port,password=password,decode_responses=True)
def add(self,proxy,score=INITIAL_SCORE):
"""
添加代理,設定分數為最高
:param proxy: 代理
:param score: 分數
:return: 添加結果
"""
if not self.db.zscore(REDIS_KEY,proxy):
return self.db.zadd(REDIS_KEY,score,proxy)
#隨機獲取代理
def random(self):
"""
隨機獲取有效代理,首先嘗試獲取最高分數代理,如果最高分數不存在,則按照排名獲取,否則例外
:return: 隨即代理
"""
result=self.db.zrangebyscore(REDIS_KEY,MAX_SCORE,MAX_SCORE)
if len(result):
return choice(result)
else:
result=self.db.zrevrange(REDIS_KEY,0,100)
if len(result):
return choice(result)
else:
raise PoolEmptyError
#檢測代理無效時分數減1
def decrease(self,proxy):
"""
代理值減一分,分數小于最小值,則代理洗掉
:param proxy: 代理
:return: 修改后的代理分數
"""
score=self.db.zscore(REDIS_KEY,proxy)
if score and score > MIN_SCORE:
print('代理',proxy,'當前分數',score,'減1')
return self.db.zincrby(REDIS_KEY,proxy,-1)
else:
print('代理',proxy,'當前分數',score,'移除')
return self.db.zrem(REDIS_KEY,proxy)
#判斷代理是否在集合
def exists(self,proxy):
"""
判斷是否存在
:param proxy:代理
:return: 是否存在
"""
return not self.db.zscore(REDIS_KEY,proxy)==None
#設定代理分數
def max(self,proxy):
"""
將代理設定為MAX_SCORE
:param proxy: 代理
:return: 設定結果
"""
print('代理',proxy,'可用,設定為',MAX_SCORE)
return self.db.zadd(REDIS_KEY,MAX_SCORE,proxy)
#回傳當前集合的元素個數
def count(self):
"""
獲取數量
:return:數量
"""
return self.db.zcard(REDIS_KEY)
#回傳所有的代理串列
def all(self):
"""
獲取全部代理
:return: 全部代理串列
"""
return self.db.zrangebyscore(REDIS_KEY,MIN_SCORE,MAX_SCORE)
獲取模塊
import json
from .utils import get_page
from pyquery import PyQuery as pq
class ProxyMetaclass(type):
def __new__(cls, name,bases,attrs):
count=0
attrs['__CrawlFunc__']=[]
# 判斷方法開頭是否是Crawl,如果是則將其加入到__CrawlFunc__屬性中,就成功將所有以crawl開頭的方法定義成了一個屬性,動態獲取到所有以crawl開頭的方法串列
for k,v in attrs.items():
if 'crawl_'in K:
attrs['__CrawlFunc__'].append(k)
count+=1
attrs['__CrawlFuncCount__']=count
return type.__new__(cls,name,bases,attrs)
class Crawler(object,metaclass=ProxyMetaclass):
def get_proxies(self,callback):
proxies=[]
for proxy in eval("self.{}()".format(callback)):
print('成功獲取到代理',proxy)
proxies.append(proxy)
return proxies
def crawl_daili66(self,page_count=4):
"""
獲取代理66
:param page_count:頁碼
:return: 代理
"""
start_url='http://www.66ip.cn/{}/html'
urls=[start_url.format(page)for page in range(1,page_count+1)]
for url in urls:
print('Crawling',url)
html=get_page(url)
if html:
doc=pq(html)
trs=doc('.containerbox table tr:gt(0)').items()
for tr in trs:
ip=tr.find('td:nth-child(1)').text()
port=tr.find('td:nth-child(2)').text()
yield ':'.join([ip,port])
def crawl_proxy360(self):
"""
獲取Proxy360
:return: 代理
"""
start_url='http://www.proxy360.cn/Regio.China'
print('Crawling',start_url)
html=get_page(start_url)
if html:
doc=pq(html)
lines=doc('div[name="list_proxy_ip"]').items()
for line in lines:
ip=line.find('.tbBottomLine:nth-child(1)').text()
port=line.find('.tbBottomLine:nth-child(2)').text()
yield ':'.join([ip,port])
def crawl_goubanjia(self):
"""
獲取Goubanjia
:return: 代理
"""
start_url='http://www.goubanjia.com/free/gngn/index.shtml'
html=get_page(start_url)
if html:
doc=pq(html)
tds=doc('td.ip').items()
for td in tds:
td.find('p').remove()
yield td.text().replace(' ','')
#定義一個Getter類,用來動態呼叫所有以crawl開頭的方法
from db import RedisClient
from crawler import Crawler
POOL_UPPER_THRESHOLD=10000
class Getter():
def __init__(self):
self.redis=RedisClient()
self.crawler=Crawler()
def is_over_threshold(self):
"""
判斷你是否達到了代理池的限制
:return:
"""
if self.redis.count()>=POOL_UPPER_THRESHOLD:
return True
else:
return False
def run(self):
print('獲取器開始執行')
if not self.is_over_threshold():
for callback_label in range(self.crawler.__CrawFuncCount__):
callback=self.crawler.CrawlFunc__[callback_label]
proxies=self.crawler.get_proxies(callback)
for proxy in proxies:
self.redis.add(proxy)
檢測模塊
VALID_STATUS_CODES=[200]
TEST_URL='http://www.baidu.com'
BATCH_TEST_SIZE=100
class Tester(object):
def __init__(self):
self.redis=RedisClient()
async def test_single_proxy(self,proxy):
"""
測驗單個代理
:param proxy: 單個代理
:return: None
"""
conn=aiohttp.TCPConnector(verify_sal=False)
async with aiohttp.ClientSession(connector=conn)as session:
try:
if isinstance(proxy,bytes):
proxy=proxy.decode('utf-8')
real_proxy='http://'+proxy
print('正在測驗',proxy)
async with session.get(TEST_URL,proxy=real_proxy,timeout=15) as response:
if response.status in VALID_STATUS_CODES:
self.redis.max(proxy)
print('代理可用',proxy)
else:
self.redis.decrease(proxy)
print('請求回應碼不合法',proxy)
except(ClientError,clientConnectorError,TimeoutError,AttributeError):
self.redis.decrease(proxy)
print('代理請求失敗',proxy)
#獲取所有的代理串列
def run(self):
"""
測驗主函式
:return: None
"""
print('測驗器開始運行')
try:
proxies=self.redis.all()
loop=asyncio.get_event_loop()
#批量測驗
for i in range(0,len(proxies),BATCH_TEST_SIZE):
test_proxies=proxies[i:i+BATCH_TEST_SIZE]
tasks=[self.test_single_proxy(proxy) for proxy in test_proxies]
loop.run_until_complete(asyncio.wait(tasks))
time.sleep(5)
except Exception as e:
print('測驗器發送錯誤',e.args)
介面模塊
from flask import Flask,g
from db import RedisClient
__all__=['app']
app=Flask(__name__)
def get_conn():
if not hasattr(g,'redis'):
g.redis=RedisClient()
return g.redis
@app.route('/')
def index():
return '<h2>Welcome to Proxy Pool System</h2>'
@app.route('/random')
def get_proxy():
"""
獲取隨機可用代理
:return: 隨機代理
"""
conn=get_conn()
return conn.random()
@app.route('/count')
def get_counts():
"""
獲取代理池總量
:return: 代理池總量
"""
conn=get_conn()
return str(conn.count())
調度模塊
TEST_CYCLE=20
GETTER_CYCLE=20
TESTER_ENABLD=True
GETTER_ENABLD=True
API_ENABLD=True
from multiprocessing import Process
from api import app
from getter import Getter
from tester import Tester
class Scheduler():
def schedule_tester(self,cycle=TEST_CYCLE):
"""
定時測驗代理
:param cycle:
:return:
"""
tester=Tester()
while True:
print('測驗器開始運行')
tester.run()
time.sleep(cycle)
def schedule_api(self):
"""
開啟API
:return:
"""
app.run(API_HOST,API_PORT)
def run(self):
print('代理池開始運行')
if TESTER_ENABLD:
tester_process=Process(target=self.schedule_tester)
tester_process.start()
if GETTER_ENABLD:
getter_process=Process(target=self.schedule_getter)
getter_process=start()
if API_ENABLD:
api_process=Process(target=self.schedule_api())
api_process.start()
if __name__ == '__main__':
app.run()
付費代理
- 訊代理:提供介面獲取海量代理,按天或者按量收費,
- 阿布云代理:搭建了代理隧道,直接設定固定域名代理,
ADSL撥號代理
ADSL通過撥號的方式上網,需要輸入ADSL賬號和密碼,每次撥號就更換一個IP,
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/260022.html
標籤:其他
上一篇:Adapter配接器模式
下一篇:React基礎知識2
