
首先,python 多執行緒不能充分利用多核CPU的計算資源(只能共用一個CPU),所以得用多行程,筆者從3.7億資料的索引,取200多萬的資料,從取資料到構造pandas dataframe總共大概用時14秒左右,每個分片用一個行程查詢資料,最后拼接出完整的結果,
由于回傳的json資料量較大,每次100多萬到200多萬,如何快速根據json構造pandas 的dataframe是個問題 — 筆者測驗過read_json()、json_normalize()、DataFrame(eval(pandas_json))及DataFrame.from_dict(),from_dict()速度最快
轉載請注明出處:https://www.cnblogs.com/NaughtyCat/p/how-to-get-all-results-from-es-by-scroll-python-version.html
- Elasticsearch scroll取資料— python版
原始碼如下:
def es_scroll(index, min_timestamp, max_timestamp, slice_no): es = Elasticsearch('http://localhost:9200', timeout = 30, max_retries=10, retry_on_timeout=True) page = es.search( index = index, doc_type = "tls_book", scroll = '1m', body={ "slice": { "id": slice_no, "max": SLICES }, "_source": [ "SrcIP" ], "sort": [ "_doc" ], "query": { "range" : { "@timestamp" : { "gte" : min_timestamp, "lte" : max_timestamp, "boost" : 2.0 } } } }, version = False, size = 10000) sid = page['_scroll_id'] scroll_size = page['hits']['total'] # Start scrolling df = pd.DataFrame() appended_data = [] while (scroll_size > 0): frame = pd.DataFrame.from_dict([document['_source'] for document in page["hits"]["hits"]]) appended_data.append(frame) page = es.scroll(scroll_id = sid, scroll = '1m', request_timeout = 30) # Update the scroll ID sid = page['_scroll_id'] # Get the number of results that we returned in the last scroll scroll_size = len(page['hits']['hits']) if len(appended_data) > 0: df = pd.concat(appended_data, ignore_index=True, sort = False) del appended_data gc.collect() es.clear_scroll(body={'scroll_id': sid}) return df
注:
(1)通過 "_source" 關鍵字,指定要取的欄位,可減少不必要的欄位,提高查詢速度
(2)官方檔案指出,通過 "sort": [ "_doc"] —即按照_doc排序,可提高查詢效率
(3)根據自己的環境,測驗合理的 size ,效率會有數倍的差距,筆者環境(128G, 32核)一次取10000性能最好,網上大多測驗,size取2000或者1000似乎較佳
(4)clear_scroll及時清理用完的scroll_id
(5)如果資料量較大,設定超時和重試次數(默認是10秒,否則超時會取不到資料),具體如下
timeout = 30, max_retries=10, retry_on_timeout=True
(6)Sliced scroll
如果回傳的資料量特別大,可通過slice讓多個分片獨自來處理請求,如下(id從0開始):
"slice": {
"id": slice_no,
"max": SLICES
},
參考: https://www.elastic.co/guide/en/elasticsearch/reference/5.1/search-request-scroll.html#sliced-scroll
- python 多行程如何個函式傳多個引數
python多行程或者多執行緒要向呼叫的函式傳遞多個引數,需要構造引數元組集合,代碼如下(本示例每個行程不同的只有es的slice_id):
def build_parameters(index, min_timestamp, max_timestamp): parmeters =[] for num in range(0, SLICES): tuple_paremeter = (index, min_timestamp, max_timestamp, num) parmeters.append(tuple_paremeter) return parmeters
- python多行程實體
示例使用行程池,及starmap 傳遞呼叫的函式及引數 (with相當于try, excepion, finallly的集合,會自動做資源的釋放或關閉等)
with multiprocessing.Pool(processes = SLICES) as pool:
result = pool.starmap(es_scroll, parameters)
然后,拼接回傳的dataframe 集合即可構造一個完整的dataframe,如下:
frame = pd.concat(result, ignore_index=True, sort = False)
*******************************************************************************************
精力有限,想法太多,專注做好一件事就行
- 我只是一個程式猿,5年內把代碼寫好,技術博客字字推敲,堅持零拷貝和原創
- 寫博客的意義在于打磨文筆,訓練邏輯條理性,加深對知識的系統性理解;如果恰好又對別人有點幫助,那真是一件令人開心的事
*******************************************************************************************
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/105977.html
標籤:Python
上一篇:全國大學生資訊安全競賽三等獎virusTotal論文展示
下一篇:apktool重新打包添加簽名
