擴展redis cluster addslots 命令
眾所周知,redis集群集群模式下必須將16384個指派完,也就是16384個槽點都在處理時,集群才處于上線上線狀態,在指派槽時用的命令是這樣的 “cluster addslots 1 2 3” 這樣,這里有個不好的地方就是每次添加只能一個槽點一個槽點這樣添加,著實不方便,我想擴展一下redis的這個命令,令他支持""cluster addslots {1…5000} "這樣的語法
我們先看一下redis中 處理“ cluster addslots”命令的相關的代碼,在 cluster.c的clusterCommand方法中,
具體代碼如下(為了便于閱讀我會添加一些中文注釋以便于讀者理解):
else if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
!strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3)
{ //如果是"cluster addslots或者 cluster "delslots命令
/* CLUSTER ADDSLOTS <slot> [slot] ... */
/* CLUSTER DELSLOTS <slot> [slot] ... */
int j, slot;
unsigned char *slots = zmalloc(CLUSTER_SLOTS); //一個陣列,記錄所有要添加或者洗掉的槽 slots[519] == 1 代表 519槽位置要洗掉或者添加
int del = !strcasecmp(c->argv[1]->ptr,"delslots"); //是否是洗掉命令
memset(slots,0,CLUSTER_SLOTS); //常規操作
/* Check that all the arguments are parseable and that all the
* slots are not already busy. */
/*遍歷引數串列 比如cluster addslots 1 2 3 4就是遍歷 1 2 3 4 */
for (j = 2; j < c->argc; j++) {
if ((slot = getSlotOrReply(c,c->argv[j])) == -1) {
zfree(slots);
return;
}
if (del && server.cluster->slots[slot] == NULL) {
addReplyErrorFormat(c,"Slot %d is already unassigned", slot);
zfree(slots);
return;
} else if (!del && server.cluster->slots[slot]) {
addReplyErrorFormat(c,"Slot %d is already busy", slot);
zfree(slots);
return;
}
if (slots[slot]++ == 1) {
addReplyErrorFormat(c,"Slot %d specified multiple times",
(int)slot);
zfree(slots);
return;
}
}
//處理所有輸入 slot
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (slots[j]) {
int retval;
/* If this slot was set as importing we can clear this
* state as now we are the real owner of the slot. */
if (server.cluster->importing_slots_from[j])
server.cluster->importing_slots_from[j] = NULL;
retval = del ? clusterDelSlot(j) :
clusterAddSlot(myself,j);
serverAssertWithInfo(c,NULL,retval == C_OK);
}
}
zfree(slots);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
addReply(c,shared.ok);
}
然后改寫:
else if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
!strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3)
{
int j;
int del = !strcasecmp(c->argv[1]->ptr,"delslots");
for (j = 2; j < c->argc; j++) {
addOrDelSlots(c,c->argv[j],del);
}
}
void addOrDelSlots(redisClient *c,robj *o,int del){
int j;
char *arg = o->ptr;
// 一個陣列,記錄所有要添加或者洗掉的槽
unsigned char *slots = zmalloc(REDIS_CLUSTER_SLOTS);
// 將 slots 陣列的所有值設定為 0
memset(slots,0,REDIS_CLUSTER_SLOTS);
if (strstr(arg,"{") == arg && strrchr(arg,'}') == (arg + strlen(arg) - 1)) //是否以"{"開頭"}"結尾 這是區間添加引數如"{0...1000}"
{
char *ellipsisIndex; //省略號下標即"..."
char startNum[6], endNum[6],startNumTemp[6],endNumTemp[6]; //because of slotNums maximum is "16384"
int start, end;
memset(startNum,0,6);
memset(endNum,0,6);
memset(startNumTemp,0,6);
memset(endNumTemp,0,6);
if( (ellipsisIndex = strstr(arg,"...")) == NULL){ //客戶端格式輸入錯誤
addReplyErrorFormat(c,"cluster addslots format error %d", ellipsisIndex);
return;
}
memcpy(startNum,arg + 1,ellipsisIndex - arg - 1); //填充startNum
memcpy(endNum,ellipsisIndex + 3,strlen(ellipsisIndex + 3) - 1); //填充endNum
start = atoi(startNum);
end = atoi(endNum);
sprintf(startNumTemp,"%d",start);
sprintf(endNumTemp,"%d",end);
if (strcmp(startNum,startNumTemp) || strcmp(endNum,endNumTemp))
{
addReplyErrorFormat(c,"cluster addslots format error");
return;
}
if(start <= -1 || end >= REDIS_CLUSTER_SLOTS || start > end){
addReplyErrorFormat(c,"cluster addslots out of the");
return;
}
for(;start <= end; start++)
slots[start] = 1;
}else{
int slot;
// 獲取 slot 數字
if ( (slot = getSlotOrReply(c,o)) == - 1) {
zfree(slots);
return;
}
slots[slot] = 1;
}
// 處理所有輸入 slot
for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
if (slots[j]) {
int retval;
if (del && server.cluster->slots[j] == NULL) {
addReplyErrorFormat(c,"Slot %d is already unassigned", j);
zfree(slots);
return;
}else if (!del && server.cluster->slots[j]) { // 如果這是 addslots 命令,并且槽已經有節點在負責,那么回傳一個錯誤
addReplyErrorFormat(c,"Slot %d is already busy", j);
zfree(slots);
return;
}
/* If this slot was set as importing we can clear this
* state as now we are the real owner of the slot. */
// 如果指定 slot 之前的狀態為載入狀態,那么現在可以清除這一狀態
// 因為當前節點現在已經是 slot 的負責人了
if (server.cluster->importing_slots_from[j])
server.cluster->importing_slots_from[j] = NULL;
// 添加或者洗掉指定 slot
retval = del ? clusterDelSlot(j) :
clusterAddSlot(myself,j);
redisAssertWithInfo(c,NULL,retval == REDIS_OK);
}
}
zfree(slots);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
addReply(c,shared.ok);
}
這樣就可以支持范圍添加了像這樣:cluster addslots {0…100}了,這段代碼沒有太大的把握保證沒有bug(保不準可能各種出現段錯誤),我只用過一些測驗案例進行測驗過,貌似還行,這段代碼質量和風格其實不怎么樣,可能以后我會改寫優化這段代碼,或許吧,
之后的時間里我會寫一些關于redis,JDK,和一些開源框架的文章,全都是原始碼級別的,從原始碼層面搞懂原理,從原始碼中學習資料結構,設計模式,演算法,
不定期的佛性的更新,不過之后的文章中我肯定會用心寫,不會像這篇文章一樣任性,“行云流水”
~~
我曾踏月而來,只因你在山中

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/276295.html
標籤:其他
