架構

(架構圖源于參考書籍)
Elasticsearch 環境搭建
官方簡介:Elasticsearch 是一個分布式、RESTful 風格的搜索和資料分析引擎,能夠解決不斷涌現出的各種用例, 作為 Elastic Stack 的核心,它集中存盤您的資料,幫助您發現意料之中以及意料之外的情況,
下載安裝
# 下載
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.15.1-linux-x86_64.tar.gz
# 解壓
tar xzvf elasticsearch-7.15.1-linux-x86_64.tar.gz
# 以非 root 用戶啟動
cd /elasticsearch-7.15.1/bin/
./elasticsearch

# 檢驗是否啟動成功,172.16.16.4 為 elasticsearch.yml 配置系結的 IP 地址
curl 172.16.16.4:9200

若無法正常啟動,則修改配置:
/home/sam/elasticsearch-7.15.1/config
修改 jvm.options 中記憶體配置:
-Xms256m
-Xmx256m
修改 vim elasticsearch.yml :
cluster.name: my-application
node.name: node-1
network.host: 172.16.16.4
http.port: 9200
discovery.seed_hosts: ["172.26.26.4", "::1"]
cluster.initial_master_nodes: ["node-1"]
配置開機啟動
# 創建開機啟動檔案
vim /usr/lib/systemd/system/elasticsearch.service
內容如下:
[Unit]
Description=elasticsearch
[Service]
User=sam #啟動用戶
LimitNOFILE=100000
LimitNPROC=100000
ExecStart=/home/sam/elasticsearch-7.15.1/bin/elasticsearch #安裝路徑
[Install]
WantedBy=multi-user.target
# 重新加載檔案配置
systemctl daemon-reload
# 設定開機啟動
systemctl enable elasticsearch
# 關掉之前啟動的 es
lsof -i tcp:9200
kill -9 pid
# 啟動 es
systemctl start elasticsearch

開啟遠程連接
# 放行埠
iptables -I INPUT 4 -p tcp -m state --state NEW -m tcp --dport 9200 -j ACCEPT
# 保存 iptables 規則
service iptables save
# 遠程測驗
curl 公網IP:9200

elasticsearch-head 插件
# github 地址
https://github.com/mobz/elasticsearch-head
# npm 啟動方式
git clone git://github.com/mobz/elasticsearch-head.git
cd elasticsearch-head
npm install
npm run start
open http://localhost:9100/
# 搭建 node 環境
# 查看當前有那些可供選擇的版本
# dnf module list node.js
# 選擇一個版本
# dnf module edable nodejs:14
# 安裝 nodejs
dnf install nodejs
# 查看當前的版本
node --version
npm --version

# 進入目錄并安裝
cd elasticsearch-head
npm install

vim elasticsearch.yml
# 配置跨域
http.cors.enabled: true
http.cors.allow-origin: "*"
# 重啟 es 服務
sudo systemctl restart elasticsearch.service

# 放行埠
iptables -I INPUT 4 -p tcp -m state --state NEW -m tcp --dport 9100 -j ACCEPT
# 保存 iptables 規則
service iptables save
# 啟動
npm run start
遠程訪問:

創建映射
創建 metadata 索引以及 objects 型別的映射:
curl -H "Content-Type: application/json" -XPUT 172.16.16.4:9200/metadata?include_type_name=true -d'{"mappings":{"objects":{"properties":{"name":{"type":"text","index":"false"},"version":{"type":"integer"},"size":{"type":"integer"},"hash":{"type":"text"}}}}}'
ES包封裝
該 ES 包封裝了以 HTTP 訪問 ES 的各種 API 的操作,
package es
/* 該 ES 包封裝了以 HTTP 訪問 ES 的各種 API 的操作 */
import (
"demo/sys"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"os"
"strings"
)
/* 元資料結構體 */
type Metadata struct {
Name string
Version int
Size int64
Hash string
}
type hit struct {
Source Metadata `json:"_source"`
}
type searchResult struct {
Hits struct {
Total int
Hits []hit
}
}
/*根據物件的名稱和版本號來獲取元資料*/
func getMetadata(name string, versionId int) (meta Metadata, e error) {
// 索引為 metadata ,型別為 objects,檔案 id 為物件名稱和版本號的拼接
url := fmt.Sprintf(sys.GetMetadataUrl, os.Getenv(sys.EsServer), name, versionId)
// 通過 GET URL 可以直接獲取該物件的元資料,免除了耗時的搜索操作
r, e := http.Get(url)
if e != nil {
return
}
if r.StatusCode != http.StatusOK {
e = fmt.Errorf(sys.FailToGetMetadata, name, versionId, r.StatusCode)
return
}
result, _ := ioutil.ReadAll(r.Body)
// 將請求結果反序列化為元資料結構
json.Unmarshal(result, &meta)
return
}
/*根據物件名稱獲取最新版本的元資料*/
func SearchLatestVersion(name string) (meta Metadata, e error) {
// 構建 url 時需要將名稱轉移成 url 字符
url := fmt.Sprintf(sys.SearchLatestVersionUrl, os.Getenv(sys.EsServer), url.PathEscape(name))
r, e := http.Get(url)
if e != nil {
return
}
if r.StatusCode != http.StatusOK {
e = fmt.Errorf(sys.FailToSearchLatestMetadata, r.StatusCode)
return
}
result, _ := ioutil.ReadAll(r.Body)
var sr searchResult
// 請求結果反序列化
json.Unmarshal(result, &sr)
// 如果長度為 0 則沒有搜索結果,直接回傳
if len(sr.Hits.Hits) != 0 {
meta = sr.Hits.Hits[0].Source
}
return
}
/*根據物件的名稱和版本號來獲取元資料*/
func GetMetadata(name string, version int) (Metadata, error) {
// 沒有指定版本號時默認回傳最新版本的元資料
if version == 0 {
return SearchLatestVersion(name)
}
return getMetadata(name, version)
}
/*向 ES 服務上傳一個新的元資料*/
func PutMetadata(name string, version int, size int64, hash string) error {
doc := fmt.Sprintf(sys.MetadataJson, name, version, size, hash)
client := http.Client{}
url := fmt.Sprintf(sys.PutMetadataUrl, os.Getenv(sys.EsServer), name, version)
request, _ := http.NewRequest(http.MethodPut, url, strings.NewReader(doc))
r, e := client.Do(request)
if e != nil {
return e
}
if r.StatusCode == http.StatusConflict {
return PutMetadata(name, version+1, size, hash)
}
if r.StatusCode != http.StatusCreated {
result, _ := ioutil.ReadAll(r.Body)
return fmt.Errorf(sys.FailToPutMetadata, r.StatusCode, string(result))
}
return nil
}
/*版本號加一*/
func AddVersion(name, hash string, size int64) error {
// 獲取目前最新的版本
version, e := SearchLatestVersion(name)
if e != nil {
return e
}
// 創建一個最新的版本號
return PutMetadata(name, version.Version+1, size, hash)
}
/*搜索物件的全部版本*/
func SearchAllVersions(name string, from, size int) ([]Metadata, error) {
// 不指定名字時則搜索全部物件的全部版本,指定名字時則搜索某個物件的全部版本
url := fmt.Sprintf(sys.SearchAllVersionsUrl, os.Getenv(sys.EsServer), from, size)
if name != "" {
url += "&q=name:" + name
}
r, e := http.Get(url)
if e != nil {
return nil, e
}
metas := make([]Metadata, 0)
result, _ := ioutil.ReadAll(r.Body)
var sr searchResult
json.Unmarshal(result, &sr)
for i := range sr.Hits.Hits {
metas = append(metas, sr.Hits.Hits[i].Source)
}
return metas, nil
}
/*洗掉指定的版本*/
func DelMetadata(name string, version int) {
client := http.Client{}
url := fmt.Sprintf(sys.DelMetadataUrl, os.Getenv(sys.EsServer), name, version)
request, _ := http.NewRequest(http.MethodDelete, url, nil)
client.Do(request)
}
type Bucket struct {
Key string
Doc_count int
Min_version struct {
Value float32
}
}
type aggregateResult struct {
Aggregations struct {
Group_by_name struct {
Buckets []Bucket
}
}
}
/*搜索版本狀態*/
func SearchVersionStatus(min_doc_count int) ([]Bucket, error) {
client := http.Client{}
url := fmt.Sprintf(sys.SearchVersionStatusUrl, os.Getenv(sys.EsServer))
body := fmt.Sprintf(sys.SearchVersionStatusJson, min_doc_count)
request, _ := http.NewRequest(http.MethodGet, url, strings.NewReader(body))
r, e := client.Do(request)
if e != nil {
return nil, e
}
b, _ := ioutil.ReadAll(r.Body)
var ar aggregateResult
json.Unmarshal(b, &ar)
return ar.Aggregations.Group_by_name.Buckets, nil
}
func HasHash(hash string) (bool, error) {
url := fmt.Sprintf(sys.HasHashUrl, os.Getenv(sys.EsServer), hash)
r, e := http.Get(url)
if e != nil {
return false, e
}
b, _ := ioutil.ReadAll(r.Body)
var sr searchResult
json.Unmarshal(b, &sr)
return sr.Hits.Total != 0, nil
}
func SearchHashSize(hash string) (size int64, e error) {
url := fmt.Sprintf(sys.SearchHashSizeUrl, os.Getenv(sys.EsServer), hash)
r, e := http.Get(url)
if e != nil {
return
}
if r.StatusCode != http.StatusOK {
e = fmt.Errorf(sys.FailToSearchHashSize, r.StatusCode)
return
}
result, _ := ioutil.ReadAll(r.Body)
var sr searchResult
json.Unmarshal(result, &sr)
if len(sr.Hits.Hits) != 0 {
size = sr.Hits.Hits[0].Source.Size
}
return
}
版本資訊搜索
package version
import (
"demo/es"
"encoding/json"
"log"
"net/http"
"strings"
)
/*處理版本搜索*/
func Handler(w http.ResponseWriter, r *http.Request) {
// 非 GET 方法時回應方法不允許
m := r.Method
if m != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// 其實是分頁引數,一頁最多有 1000 條記錄,默認從第 0 條開始往后取資料
// 當回傳值的長度不等于 1000 時,則說明后續沒有資料了,直接回傳
// 當回傳值等于 1000 時,說明后續可能有資料, from 則從 1000 條開始往后取資料
from := 0
size := 1000
// 若未指定名字,則切割 URL 之后名字為空字串
name := strings.Split(r.URL.EscapedPath(), "/")[2]
for {
metas, e := es.SearchAllVersions(name, from, size)
if e != nil {
log.Println(e)
// 服務器內部錯誤
w.WriteHeader(http.StatusInternalServerError)
return
}
// 遍歷結果集
for i := range metas {
// 格式化為 json 回傳
b, _ := json.Marshal(metas[i])
w.Write(b)
w.Write([]byte("\n"))
}
if len(metas) != size {
return
}
from += size
}
}
散列(hash)工具類封裝
package utils
import (
"crypto/sha256"
"encoding/base64"
"io"
"net/http"
"strconv"
"strings"
)
/*從 header 獲取偏移量*/
func GetOffsetFromHeader(h http.Header) int64 {
byteRange := h.Get("range")
if len(byteRange) < 7 {
return 0
}
if byteRange[:6] != "bytes=" {
return 0
}
bytePos := strings.Split(byteRange[6:], "-")
offset, _ := strconv.ParseInt(bytePos[0], 0, 64)
return offset
}
/*從 header 獲取散列值*/
func GetHashFromHeader(h http.Header) string {
digest := h.Get("digest")
if len(digest) < 9 {
return ""
}
if digest[:8] != "SHA-256=" {
return ""
}
return digest[8:]
}
/*從 header 獲取內容長度*/
func GetSizeFromHeader(h http.Header) int64 {
size, _ := strconv.ParseInt(h.Get("content-length"), 0, 64)
return size
}
/*計算散列值*/
func CalculateHash(r io.Reader) string {
h := sha256.New()
io.Copy(h, r)
return base64.StdEncoding.EncodeToString(h.Sum(nil))
}
PUT、GET、DELETE 處理函式
package objects
import (
"demo/apiServer/heartbeat"
"demo/apiServer/locate"
"demo/apiServer/objectStream"
"demo/es"
"demo/sys"
"demo/utils"
"fmt"
"io"
"log"
"net/http"
"net/url"
"strconv"
"strings"
)
/*介面服務的 PUT 和 GET 請求是將 HTTP 請求轉發到資料服務,實際上是呼叫資料服務的 PUT 和 GET 方法*/
func Handler(w http.ResponseWriter, r *http.Request) {
m := r.Method
// PUT 方法時,創建或者替換資源
if m == http.MethodPut {
put(w, r)
return
}
// GET 方法時,獲取資源
if m == http.MethodGet {
get(w, r)
return
}
// 版本洗掉
if m == http.MethodDelete {
del(w,r)
return
}
// 其他方式時,回傳狀態碼,方法不允許
w.WriteHeader(http.StatusMethodNotAllowed)
}
/*處理介面服務 PUT 請求*/
func put(w http.ResponseWriter, r *http.Request) {
// 按以前的步驟,這里應該獲取存盤物件名字,不過從 header 中取物件的散列值作為名字
hash := utils.GetHashFromHeader(r.Header)
if hash == "" {
log.Println(sys.MissingObjectHash)
w.WriteHeader(http.StatusBadRequest)
return
}
// 存盤請求資料,散列值要作轉義
httpStatus, e := storeObject(r.Body, url.PathEscape(hash))
if e != nil {
log.Println(e)
w.WriteHeader(httpStatus)
return
}
if httpStatus != http.StatusOK {
w.WriteHeader(httpStatus)
return
}
// 獲取名字和大小,新增一個物件版本
name := strings.Split(r.URL.EscapedPath(), "/")[2]
size := utils.GetSizeFromHeader(r.Header)
e = es.AddVersion(name, hash, size)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
}
// 回傳結果
w.WriteHeader(httpStatus)
}
func storeObject(r io.Reader, obj string) (int, error) {
// 獲取介面服務節點存盤物件的流
stream, e := putStream(obj)
if e != nil {
return http.StatusServiceUnavailable, e
}
// 將請求資料體拷貝到流 stream
io.Copy(stream, r)
// 關閉流
e = stream.Close()
if e != nil {
return http.StatusInternalServerError, e
}
// 回傳成功狀態碼
return http.StatusOK, nil
}
func putStream(obj string) (*objectStream.PutStream, error) {
// 隨機選擇一個資料服務節點
server := heartbeat.ChooseRandomDataServer()
// 若沒有可用的資料服務節點則回傳錯誤資訊
if server == "" {
return nil, fmt.Errorf(sys.DataServerNotFound)
}
// 回傳資料服務節點存盤物件的流
return objectStream.NewPutStream(server, obj), nil
}
/*處理介面服務 GET 請求*/
func get(w http.ResponseWriter, r *http.Request) {
// 獲取存盤物件名稱和版本號
name := strings.Split(r.URL.EscapedPath(), "/")[2]
versionId := r.URL.Query()["version"]
version := 0
var e error
if len(versionId) != 0 {
// 版本號字串轉數字
version, e = strconv.Atoi(versionId[0])
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusBadRequest)
return
}
}
// 根據名字和版本號來獲取元資料
meta, e := es.GetMetadata(name, version)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
// 元資料散列值為空則無該物件
if meta.Hash == "" {
w.WriteHeader(http.StatusNotFound)
return
}
// 散列值要作 URL 轉移
object := url.PathEscape(meta.Hash)
// 根據散列值獲取物件資料
stream, e := getStream(object)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusNotFound)
return
}
// 將資料流拷貝到回應流 w
io.Copy(w, stream)
}
func getStream(obj string) (io.Reader, error) {
// 根據存盤物件名稱進行定位
server := locate.Locate(obj)
// 未找到該存盤物件時回傳定位失敗錯誤
if server == "" {
return nil, fmt.Errorf(sys.DataServerLocateFail, obj)
}
// 定位到存盤物件時,回傳該物件的資料流
return objectStream.NewGetStream(server, obj)
}
/*處理介面服務 DELETE 請求*/
func del(w http.ResponseWriter, r *http.Request) {
// 獲取名字
name := strings.Split(r.URL.EscapedPath(),"/")[2]
v,e := es.SearchLatestVersion(name)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
// 插入一條新的元資料作洗掉標記
e = es.PutMetadata(name,v.Version + 1,0,"")
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
}
測驗
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/336186.html
標籤:其他

