自制python爬蟲程式模板(爬蟲小白亦可用)
- 1.mysql資料庫鏈接
- 2. 頁面請求程序
- 3. 資料提取處理
- 4. 資料保存處理,
??在平時揮手大干專案的程序中,時不時會有一些小的爬蟲任務需要處理,因此專門寫了一個爬蟲框架,基本覆寫平常用到的網站,覺得使用效果不錯,分享出來給大家使用,也請各路大神走過路過提些好的意見,
??接下來為大家簡單介紹一下每個模塊實作程序及思路,本文結束后處會附全部代碼,前面代碼只是便于大家理解,無需挨個粘貼,
1.mysql資料庫鏈接
??本程式使用mysql資料庫讀取和保存資料,為了作業程序中的安全和方便,我們通過另外一個程式將資料庫鏈接賬號密碼等資料,保存中windows注冊表中,可通過 win+regedit 調出查看,(此塊僅適用于windows系統,若需在linux上使用,則不使用此模塊鏈接資料庫),本模塊中資料庫鏈接方式見代碼:
def read_setttings_zhuce(self, file, winn_c_u=winreg.HKEY_CURRENT_USER):
"""
讀取注冊表中的設定
"""
parentkey = winreg.OpenKey(winn_c_u, file)
# 獲取該鍵的所有鍵值,因為沒有方法可以獲取鍵值的個數,所以只能用這種方法進行遍歷
item = dict()
try:
i = 0
while True:
# EnumValue方法用來列舉鍵值,EnumKey用來列舉子鍵
name, value, type = winreg.EnumValue(parentkey, i)
item[name] = value
i += 1
except Exception as e:
pass
return item
def __init__(self, start_p):
# 注意,super().__init__() 一定要寫
# 而且要寫在最前面,否則會報錯,
super().__init__()
self.item_fwq = self.read_setttings_zhuce("Software\lxl\lxl_program")
# 鏈接資料庫
self.conn = pymysql.connect(
user=self.item_fwq["user"], password=self.item_fwq["password"], host=self.item_fwq["host"], port=int(self.item_fwq["port"]),
database=self.item_fwq["database"], use_unicode=True,
charset="utf8")
self.start_p = start_p
print("資料庫開啟中......")
# 獲取游標
self.cursor = self.conn.cursor()
2. 頁面請求程序
??此處說明一下,整個模塊是通過dict來傳遞資料的,因此在使用程序中,可以隨時隨地添加我們需要傳遞的引數,我們平常用到的頁面一般是get或post請求方式,get方式通過修改傳遞的url鏈接即可請求獲取資料,post方式通過data引數傳遞獲取資料,因此將兩種方式分開處理,同時將請求回來的資料做deocde解碼處理,一般遇到的有utf8或者GBK的,我寫了兩種,如果你們使用程序中出現其他的解碼,添加上去即可,此處代碼比較low我就不貼在此處了,各位結尾處直接復制即可,(我貼幾行重點吧,否則好像顯得此處特殊),
item_fwq_ip = read_setttings_zhuce("Software\lxl\lxl_program")
# 讀取實時寫入windows注冊表中的ip代理 本人喜歡使用無憂代理 不是打廣告,而是品質確實好
proxies = {"http": "%s" % item_fwq_ip["ip"], "https": "%s" % item_fwq_ip["ip"]}
headers = {
"user-agent": item_fwq_ip['user_agent']
}
try:
response = requests.get(url=url, headers=headers, timeout=20).content
if response:
return response
except Exception as f:
print("重新請求")
try:
response = requests.post(url=url, headers=headers, data=data,timeout=20).content
if response:
return response
except Exception as f:
print("重新請求")
3. 資料提取處理
??頁面請求成功之后,會回傳三種格式,一種是html格式,一種是json格式,還有一種是我請求不到資料回傳的無資料結果(未針對此處如何處理,若有需要,自行處理),針對html格式我們使用xpath決議資料(本來想著能不能通過代碼去自動處理xpath,太忙沒時間,以后補上吧);針對json格式,就簡單許多了,直接對應讀取出來即可,兩種格式處理之后,將資料以dict格式傳遞至資料保存處理中即可 見代碼:
def response_json(self, response, meta={}):
"""
json 格式決議
"""
list_data = response['result']['data']
for ds in list_data:
item = dict()
"""
此處可以對資料進行處理,若不需特殊處理的 則直接合并到item字典中,保存入資料庫
列: item["pid] = ds['id']
"""
item = {**item, **meta}
where_list = ["pid"] # 此處添加mysql保存判斷條件中查詢的欄位 可寫多個欄位
table_name = 'your_databases_tablename' # 此處添加你需要保存的資料表名稱 注: 若沒有新建資料表, 代碼可自動建立新的資料表
self.mysql_f_item(item, table_name=table_name, where_list=where_list)
def response_html(self, response, meta={}):
"""
html 格式決議
"""
list_response = response.xpath('//div[@class="name"]')
for resp in list_response:
item = dict()
"""
此處可以對資料進行xpath決議處理,保存入資料庫
列: item["pid] = resp.xpath('./a/@href')[0]
"""
print(item)
item = {**item, **meta}
where_list = ["pid"] # 此處添加mysql保存判斷條件中查詢的欄位 可寫多個欄位
table_name = "your_databases_tablename" # 此處添加你需要保存的資料表名稱 注: 若沒有新建資料表, 代碼可自動建立新的資料表
self.mysql_f_item(item, table_name, where_list=where_list)
4. 資料保存處理,
??資料庫選用mysql保存,在此模塊中,我加入了自動創建表和自動拼接sql的功能,傳入一個資料表名稱,若存在則進行下一步處理,不存在會進行資料表創建,此時dict中的欄位名稱就起到了一定的作用,我通過欄位中所帶的值,作為創建欄位的型別(此處也可自行添加);同時資料保存程序中,有時會需要做判重,通過在指定串列 where_list 中添加欄位即可(默認為空,不判重,其他的沒什么了都是一些常規操作了,見代碼:
sql = "insert into %s(" % table_name
for item in lst:
sql = sql + "`%s`," % item
sql = sql.strip(',') + ") values ("
if list_flag is False:
for item in lst:
sql = sql + "'{%s}'," % item
else:
for i in range(len(lst)):
sql = sql + "'{0[%s]}'," % i
sql = sql.strip(',') + ")"
return sql
sql_begin = """CREATE TABLE `%s` ( `id` int(11) NOT NULL AUTO_INCREMENT,""" % table_name
sql_end = """ PRIMARY KEY (`id`)
) ENGINE=%s AUTO_INCREMENT=0 DEFAULT CHARSET=%s;""" % (engine, charset)
sql_temp = " `%s` varchar(256) DEFAULT NULL,"
sql_temp_time = "`%s` datetime DEFAULT NULL,"
sql_temp_content = "`%s` text,"
sql_temp_sgin = "`%s` enum('0','1') DEFAULT '0',"
sql = str()
for item in lst:
# 生成新的資料表時 可根據item中的欄位名稱 來決定資料庫中欄位的型別
if "time" in item:
sql += sql_temp_time % item
elif "content" in item:
sql += sql_temp_content % item
elif "sgin" in item:
sql += sql_temp_sgin % item
else:
sql += sql_temp % (item)
sql = sql_begin + sql + sql_end
return sql
??好了,這次就寫到這里吧,如果之后對這個模塊做大的更新或調整再說吧, 如果對以上代碼有不懂之處,可以發送至郵件 xiang_long_liu@163.com,大家共同探討吧,
結尾處付全部代碼:
import requests, winreg, pymysql, re, json
from lxml import etree
from threading import Thread
import settings # 將服務器資料庫等鏈接方式寫入windows注冊表中,然后再在該程式中讀取出來
def read_setttings_zhuce(file, winn_c_u=winreg.HKEY_CURRENT_USER):
"""
讀取注冊表中的設定
"""
parentkey = winreg.OpenKey(winn_c_u, file)
# 獲取該鍵的所有鍵值,因為沒有方法可以獲取鍵值的個數,所以只能用這種方法進行遍歷
item = dict()
try:
i = 0
while True:
# EnumValue方法用來列舉鍵值,EnumKey用來列舉子鍵
name, value, type = winreg.EnumValue(parentkey, i)
# print(name, value)
item[name] = value
i += 1
except Exception as e:
pass
return item
class ALi_Main(Thread):
def read_setttings_zhuce(self, file, winn_c_u=winreg.HKEY_CURRENT_USER):
"""
讀取注冊表中的設定
"""
parentkey = winreg.OpenKey(winn_c_u, file)
# 獲取該鍵的所有鍵值,因為沒有方法可以獲取鍵值的個數,所以只能用這種方法進行遍歷
item = dict()
try:
i = 0
while True:
# EnumValue方法用來列舉鍵值,EnumKey用來列舉子鍵
name, value, type = winreg.EnumValue(parentkey, i)
item[name] = value
i += 1
except Exception as e:
pass
return item
def __init__(self, start_p):
# 注意,super().__init__() 一定要寫
# 而且要寫在最前面,否則會報錯,
super().__init__()
self.item_fwq = self.read_setttings_zhuce("Software\lxl\lxl_program")
# 鏈接資料庫
self.conn = pymysql.connect(
user=self.item_fwq["user"], password=self.item_fwq["password"], host=self.item_fwq["host"], port=int(self.item_fwq["port"]),
database=self.item_fwq["database"], use_unicode=True,
charset="utf8")
self.start_p = start_p
print("資料庫開啟中......")
# 獲取游標
self.cursor = self.conn.cursor()
def main(self, url="https://www.baidu.com/", formdata={}, meta={}):
"""
開關
"""
response = self.url_f_requests(url, formdata)
if response != "無結果":
# 對回傳的結果解碼
response = self.response_decode(response)
print(response)
response, fangshi = self.t_f_response_json_html(response)
if fangshi is "json":
self.response_json(response, meta)
elif fangshi is "html":
self.response_html(response, meta)
else:
print(fangshi)
print("回傳的頁面資料有誤請檢查")
else:
print("資料無結果,未獲取到")
def url_f_requests(self, url, formdata):
"""
get / post 請求發送
"""
if formdata == {}:
response = self.requests_url(url)
print("{INFO}:url以 get 方式請求")
# print(response)
else:
response = self.requests_url_post(url, formdata)
print("{INFO}:url以 post 方式請求")
# print(response)
return response
def t_f_response_json_html(self, response):
"""
判斷回傳的結果
"""
try:
response = json.loads(response)
print("{INFO}:資料以json格式回傳")
return response, "json"
except Exception as f:
try:
response = etree.HTML(response)
print("{INFO}:資料以html格式回傳")
return response, "html"
except Exception as f:
response = response
return response, "None"
def response_decode(self, response):
"""
對回傳的結果解碼
"""
try:
response = response.decode()
print("{INFO}:資料以utf-8解碼")
except Exception as f:
try:
response = response.decode("GBK")
print("{INFO}:資料以 GBK 解碼")
except Exception as f:
print("{INFO}:資料以未指定解碼方式回傳")
response = response
return response
def response_json(self, response, meta={}):
"""
json 格式決議
"""
list_data = response['result']['data']
for ds in list_data:
item = dict()
"""
此處可以對資料進行處理,若不需特殊處理的 則直接合并到item字典中,保存入資料庫
列: item["pid] = ds['id']
"""
item = {**item, **meta}
where_list = ["pid"] # 此處添加mysql保存判斷條件中查詢的欄位 可寫多個欄位
table_name = 'your_databases_tablename' # 此處添加你需要保存的資料表名稱 注: 若沒有新建資料表, 代碼可自動建立新的資料表
self.mysql_f_item(item, table_name=table_name, where_list=where_list)
def response_html(self, response, meta={}):
"""
html 格式決議
"""
list_response = response.xpath('//div[@class="name"]')
for resp in list_response:
item = dict()
"""
此處可以對資料進行xpath決議處理,保存入資料庫
列: item["pid] = resp.xpath('./a/@href')[0]
"""
print(item)
item = {**item, **meta}
where_list = ["pid"] # 此處添加mysql保存判斷條件中查詢的欄位 可寫多個欄位
table_name = "your_databases_tablename" # 此處添加你需要保存的資料表名稱 注: 若沒有新建資料表, 代碼可自動建立新的資料表
self.mysql_f_item(item, table_name, where_list=where_list)
def mysql_f_item(self, item, table_name="new_table_name", where_list=[]):
"""
保存創建mysql資料庫
"""
lst = item.keys()
# print(lst)
insert_sql = self.create_insert_sql_for_list(table_name=table_name, lst=lst)
insert_sql = insert_sql.format(**item)
# print(insert_sql)
select_sql = self.create_select_sql(table_name=table_name, where_list=where_list)
select_sql = select_sql.format(**item)
# print(select_sql)
self.insert_mysql(insert_sql=insert_sql, select_sql=select_sql, table_name=table_name, lst=lst)
print("--------------------------------")
def create_insert_sql_for_list(self, table_name, lst, list_flag=False):
"""
動態生成sql文(單條)
:param table_name:表名
:param lst:插入的資料串列
:param list_flag: true:代表lst欄位是 list嵌套list, false:代表list嵌套dict
:return:回傳單條插入的sql
"""
sql = "insert into %s(" % table_name
for item in lst:
sql = sql + "`%s`," % item
sql = sql.strip(',') + ") values ("
if list_flag is False:
for item in lst:
sql = sql + "'{%s}'," % item
else:
for i in range(len(lst)):
sql = sql + "'{0[%s]}'," % i
sql = sql.strip(',') + ")"
return sql
def create_select_sql(self, table_name, where_list):
"""
動態生成sql文
"""
if where_list == []:
return ""
else:
sql = 'select id from %s where' % table_name
for i in range(len(where_list)):
sql = sql + " `%s` = '{%s}' and " % (where_list[i], where_list[i])
sql = sql.strip('and ')
# print(sql)
return sql
def create_table(self, table_name, lst, engine='MyISAM', charset='utf8'):
"""
生成建表sql
:param table_name:表名
:param lst:欄位串列
:param engine:資料庫型別
:param charset:字符集
:return:sql
"""
sql_begin = """CREATE TABLE `%s` ( `id` int(11) NOT NULL AUTO_INCREMENT,""" % table_name
sql_end = """ PRIMARY KEY (`id`)
) ENGINE=%s AUTO_INCREMENT=0 DEFAULT CHARSET=%s;""" % (engine, charset)
sql_temp = " `%s` varchar(256) DEFAULT NULL,"
sql_temp_time = "`%s` datetime DEFAULT NULL,"
sql_temp_content = "`%s` text,"
sql_temp_sgin = "`%s` enum('0','1') DEFAULT '0',"
sql = str()
for item in lst:
# 生成新的資料表時 可根據item中的欄位名稱 來決定資料庫中欄位的型別
if "time" in item:
sql += sql_temp_time % item
elif "content" in item:
sql += sql_temp_content % item
elif "sgin" in item:
sql += sql_temp_sgin % item
else:
sql += sql_temp % (item)
sql = sql_begin + sql + sql_end
return sql
def insert_mysql(self, insert_sql, select_sql='', update_sql='', table_name='', lst=()):
"""
保存資料
"""
while True:
# 獲取游標
self.conn.ping(reconnect=True)
if select_sql:
try:
self.cursor.execute(select_sql)
if self.cursor.fetchone() is None:
print(insert_sql)
try:
self.cursor.execute(insert_sql)
self.conn.commit()
print("資料保存中......")
if update_sql:
self.cursor.execute(update_sql)
self.conn.commit()
print("資料更新中......")
break
except Exception as f:
# print(insert_sql)
print(f)
print("資料保存失敗")
break
else:
print("資料已存在")
break
except Exception as f:
print(f)
# 首次執行 創建一個新的資料表
if "Table" in str(f) and "doesn't exist" in str(f):
print("*" * 100)
print("創建資料庫中......")
sql = self.create_table(table_name=table_name, lst=lst)
self.cursor.execute(sql)
self.conn.commit()
else:
break
else:
try:
print(insert_sql)
print("資料保存中......")
self.cursor.execute(insert_sql)
self.conn.commit()
break
except Exception as f:
print(f)
# 首次執行 創建一個新的資料表
if "Table" in str(f) and "doesn't exist" in str(f):
print("*" * 100)
print("創建資料庫中......")
sql = self.create_table(table_name=table_name, lst=lst)
self.cursor.execute(sql)
self.conn.commit()
else:
print("保存失敗")
break
def getDropStr(self, l_strHtml):
"""清洗字串"""
strList = re.findall(
r'[\u4e00-\u9fa5a-zA-Z0-9,.;?!_\]\'\"\[{}+-\u2014\u2026\uff1b\u3002\uff1b\uff0c\uff1a\u201c\u201d\uff08\uff09\u3001\uff1f\u300a\u300b]',
l_strHtml)
return "".join(strList)
def requests_url(self, url, data=None):
"""
發送請求,回傳相應
"""
item_fwq_ip = read_setttings_zhuce("Software\lxl\lxl_program")
# 讀取實時寫入windows注冊表中的ip代理 本人喜歡使用無憂代理 不是打廣告,而是品質確實好
proxies = {"http": "%s" % item_fwq_ip["ip"], "https": "%s" % item_fwq_ip["ip"]}
headers = {
"user-agent": item_fwq_ip['user_agent']
}
try:
response = requests.get(url=url, headers=headers, timeout=20).content
if response:
return response
except Exception as f:
print("重新請求")
i = 0
while True:
i += 1
if i >= 5:
return "無結果"
try:
response = requests.get(url=url, headers=headers, proxies=proxies, timeout=20).content
if response:
return response
except Exception as f:
print("重新請求")
def requests_url_post(self, url, data):
"""
發送請求,回傳相應
"""
item_fwq_ip = read_setttings_zhuce("Software\lxl\lxl_program")
# 讀取實時寫入windows注冊表中的ip代理 本人喜歡使用無憂代理 不是打廣告,而是品質確實好
proxies = {"http": "%s" % item_fwq_ip["ip"], "https": "%s" % item_fwq_ip["ip"]}
headers = {
"user-agent": item_fwq_ip['user_agent']
}
try:
response = requests.post(url=url, headers=headers, data=data,timeout=20).content
if response:
return response
except Exception as f:
print("重新請求")
i = 0
while True:
i += 1
if i >= 5:
return "無結果"
try:
response = requests.post(url=url, headers=headers, data=data, proxies=proxies, timeout=20).content
if response:
return response
except Exception as f:
print("重新請求")
def __del__(self):
self.cursor.close()
self.conn.close()
print("資料庫關閉中......")
def main_thread(number_p):
"""
多執行緒啟動
若使用多執行緒爬取是 將 main 函式改為 run 函式 傳遞引數控制url使用個數從而決定多執行緒條數
"""
print("多執行緒啟動程式")
list_thread = list()
for p in range(0, number_p+1000, 1000):
thread = ALi_Main(p)
list_thread.append(thread)
for threads in list_thread:
threads.start()
for threads in list_thread:
threads.join()
if __name__ == '__main__':
# 初始化
# settings.main()
alm = ALi_Main(0)
meta = dict()
meta["key_name"] = "傳值"
url = "https://search.sina.com.cn/?range=title&q=" + str(meta["key_name"]) + "&c=news&time=&ie=utf-8&col=&source=&from=&country=&size=&a=&page=1&pf=0&ps=0&dpc=1"
print(url)
alm.main(url=url, meta=meta)
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/185012.html
標籤:其他
