主頁 > 後端開發 > 用Java寫一個分布式快取——RESP服務端

用Java寫一個分布式快取——RESP服務端

2023-02-09 06:58:42 後端開發

前言

本篇我們將完成一個RESP的socket的服務端,初步完成一個單機版快取,

另外在其中我們還需要完成命令的動態路由

原始碼:https://github.com/weloe/Java-Distributed-Cache

本篇代碼:

https://github.com/weloe/Java-Distributed-Cache/tree/master/src/main/java/com/weloe/cache/server

上篇:快取管理 https://www.cnblogs.com/weloe/p/17068891.html

RESP協議

RESP協議支持5種資料型別:字串,例外,整數,多行字串,陣列

資料型別由第一個位元組進行區分,不同部分使用\r\n來分開

  • '+' : 簡單字串
  • '-' : 例外
  • ':' : 整數
  • '$': 多行字串
  • '*': 陣列

簡單字串一般用來回傳該操作無誤,操作成功,例如回傳+ok\r\n

例外: -error msg\r\n

整數:: 1\r\n

多行字串:

$4\r\n
test

這里的4是實際資料的長度

實際資訊為 test

陣列:

*2\r\n
$2\r\n
ab\r\n
$3\r\n
cde\r\n

資訊為字串陣列[ab][cde]

實作

https://github.com/weloe/Java-Distributed-Cache/blob/master/src/main/java/com/weloe/cache/util/RESPUtil.java

根據協議我們就能撰寫出對請求資訊的決議類了,我們把它抽象為一個工具類

對回傳內容決議的代碼

	/**
     * 讀取RESP協議的位元組,轉為String
     *
     * @return String
     */
    public static String parseRESPBytes(InputStream inputStream) {
        byte[] bytes;

        String result = null;
        try {
            while (inputStream.available() == 0) {
            }
            bytes = new byte[inputStream.available()];
            inputStream.read(bytes);
            result = new String(bytes);

        } catch (IOException e) {
            e.printStackTrace();
        }

        return result;
    }

    /**
     * 決議RESP協議的String
     *
     * @param raw
     * @return
     */
    public static Object parseRESPString(String raw) {
        byte type = raw.getBytes()[0];
        String result = raw.substring(1);
        switch (type) {
            case '+':
                // +ok\r\n
                // 讀單行
                return result.replace("\r\n", "");
            case '-':
                // 例外
                // -Error msg\r\n
                throw new RuntimeException(result.replace("\r\n", ""));
            case ':':
                // 數字
                return result.replace("\r\n", "");
            case '$':
                return result.split("\r\n")[1];
            case '*':
                // 多行字串
                String[] strList = result.substring(result.indexOf("$")).split("\r\n");
                System.out.print("多條批量請求:");
                List<String> list = new LinkedList<>();
                for (int i = 1; i < strList.length; i += 2) {
                    System.out.print(strList[i] + " ");
                    list.add(strList[i]);
                }
                System.out.println();
                return list;
            default:
                throw new RuntimeException("錯誤的資料格式");
        }
    }

發送請求的代碼

    /**
     * 發送RESP請求
     * @param host
     * @param port
     * @param args
     * @return
     * @throws IOException
     */
    public static byte[] sendRequest(String host,Integer port, String ... args) throws IOException {
        Socket socket = new Socket(host,port);
        RESPRequest request = new RESPRequest(socket);
        PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8));
        sendRequest(writer,args);

        byte[] param = request.getBytes();

        request.close();
        writer.close();
        socket.close();
        return param;
    }

    /**
     * 發送RESP請求
     * @param writer
     * @param args
     * @throws IOException
     */
    public static void sendRequest(PrintWriter writer, String ... args) throws IOException {
        writer.println("*"+args.length);
        for (String arg : args) {
            writer.println("$"+arg.getBytes(StandardCharsets.UTF_8).length);
            writer.println(arg);
        }
        writer.flush();
    }

命令的決議

通過RESP協議的實作,我們就能做到對我們的快取發送命令,那么我們又該怎么對命令進行決議,然后做出對應的操作呢?

這里我們使用前綴樹結構來進行動態路由

Node

https://github.com/weloe/Java-Distributed-Cache/blob/master/src/main/java/com/weloe/cache/server/parser/Node.java

首先要構建樹的節點

需要注意的是只有葉子節點才有pattern

isWild來判斷是否有通配符則是為了來匹配用戶輸入的引數,我們設定路由的時候用:作為通配符

例如設定路由set :group :k :v

命令為:set g1 k1 v1

把 :group和g1匹配,:k和k1匹配,:v和v1匹配

最后我們要決議出 group = g1 , k = k1, v = v1

/**
 * @author weloe
 */
public class Node {
    /**
     * 待匹配的命令引數,只有最后一層才有 例如 set :group :key :value
     */
    private String pattern;

    /**
     * 命令引數的一部分例如 set :group :key :value 中的 :group
     */
    private String part;

    /**
     * 子節點
     */
    private List<Node> children;

    /**
     * 是否是通配符節點
     */
    private boolean isWild;

    public Node() {
        this.children = new LinkedList<>();
        this.part = "";
        this.pattern = "";
    }

    public Node(String part, boolean isWild) {
        this.part = part;
        this.isWild = isWild;
        this.children = new LinkedList<>();
    }
}

注冊節點的方法

	/**
     * 注冊節點
     *
     * @param pattern
     * @param parts
     * @param height
     */
    public void insert(String pattern, String[] parts, int height) {
        // 終止條件,height匹配完,到了最下層
        if (parts.length == height) {
            this.pattern = pattern;
            return;
        }

        String part = parts[height];
        // 匹配出一個子節點
        Node child = matchChild(part);
        if (child == null) {
            // 如果當前part的第一個字符是":"或者"*"就為模糊匹配
            child = new Node(part, part.startsWith(":") || part.startsWith("*"));
            // 增加當前節點的子節點
            children.add(child);
        }
        child.insert(pattern, parts, height + 1);
    }

注冊節點方法中呼叫的machChild()為 根據part部分匹配子節點的方法

	/**
     * 根據part匹配子節點
     *
     * @param part
     * @return 第一個匹配節點
     */
    public Node matchChild(String part) {
        for (Node child : children) {
            if (child.part.equals(part) || child.isWild) {
                return child;
            }
        }
        return null;
    }

根據字串陣列(輸入命令)來遞回匹配出對應葉子節點的方法

	/**
     * 根據parts[]匹配出節點
     * @param parts
     * @param height
     * @return
     */
    public Node search(String[] parts, int height) {
        // 匹配到末端
        if(parts.length == height || part.startsWith("*")){
            if(pattern == null){
                return null;
            }
            // 匹配到節點
            return this;
        }

        String part = parts[height];
        // 根據part找到匹配的子節點
        List<Node> children = matchChildren(part);

        for (Node child : children) {
            Node node = child.search(parts, height + 1);
            if(node != null){
                return node;
            }
        }

        return null;
    }

search方法中呼叫的matchChildren為 根據part來匹配出所有符合的子節點的方法

    /**
     * 根據part匹配子節點
     * @param part
     * @return 所有匹配的節點
     */
    public List<Node> matchChildren(String part) {
        ArrayList<Node> nodes = new ArrayList<>();
        for (Node child : children) {
            if (child.part.equals(part) || child.isWild) {
                nodes.add(child);
            }
        }
        return nodes;
    }

Router

https://github.com/weloe/Java-Distributed-Cache/blob/master/src/main/java/com/weloe/cache/server/parser/Router.java

Router提供了添加路由和根據用戶命令匹配路由的方法

這里的HandlerFunc是需要我們在添加路由addRoute時進行設定的命令對應的處理函式

這里的Route類是我們在getRoute時回傳的,node就是匹配到的樹的葉子節點,map為匹配到的通配符引數

例如設定路由set :group :k :v

命令為:set g1 k1 v1

把 :group和g1匹配,:k和k1匹配,:v和v1匹配

map即為 group = g1 , k = k1, v = v1

/**
 * @author weloe
 */
public class Router {

    @FunctionalInterface
    public interface HandlerFunc {
        Object handle(RESPContext context);
    }

    public class Route{
        private Node node;
        private Map<String,String> map;

        public Route(Node node, Map<String, String> map) {
            this.node = node;
            this.map = map;
        }

        public Object handle(RESPContext context){
            context.setParamMap(map);
            String key = "command"+"-"+node.getPattern();
            return handlers.get(key).handle(context);
        }

        public Map<String, String> getMap() {
            return map;
        }

        public Node getNode() {
            return node;
        }
    }

    /**
     * 根節點
     */
    private Map<String,Node> roots;

    private Map<String,HandlerFunc> handlers;


    public Router() {
        this.roots = new LinkedHashMap<>();
        this.handlers = new LinkedHashMap<>();
    }

    /**
     * 決議pattern
     * @param pattern
     * @return
     */
    public String[] parsePattern(String pattern){
        String[] patterns = pattern.split(" ");

        String[] parts = new String[patterns.length];
        for (int i = 0; i < patterns.length; i++) {
            parts[i] = patterns[i];
            if(patterns[i].charAt(0) == '*'){
                break;
            }
        }

        return parts;
    }

    public void addRoute(String method,String pattern,HandlerFunc handler){
        String[] parts = parsePattern(pattern);

        String key = method + "-" + pattern;
        Node node = roots.get(method);
        // 判斷有沒有該method對應的根節點,沒有就建一個
        if (node == null) {
            roots.put(method,new Node());
        }
        roots.get(method).insert(pattern,parts,0);
        handlers.put(key,handler);
    }

    public Route getRoute(String path){
        return getRoute("command",path);
    }

    public Route getRoute(String method,String path){

        String[] patterns = parsePattern(path);
        Map<String, String> params = new LinkedHashMap<>();

        Node root = roots.get(method);
        if(root == null){
            return null;
        }
        Node res = root.search(patterns, 0);

        if (res == null) {
            return null;
        }
        String[] parts = parsePattern(res.getPattern());
        for (int i = 0; i < parts.length; i++) {
            String part = parts[i];
            if (part.charAt(0) == ':') {
                params.put(part.substring(1),patterns[i]);
            }
            if(part.charAt(0) == '*' && part.length() > 1){
                String collect = Arrays.stream(patterns).skip(i).collect(Collectors.joining(" "));
                params.put(part.substring(1),collect);
                break;
            }

        }

        return new Route(res,params);
    }

}

測驗

這里相當于我們輸入delete g1 k1 v1

匹配到命令為 delete

匹配到的引數為key:k1

@Test
    void getRoute() {
        Router router = new Router();

        router.addRoute("command","set :group :key :v",null);
        router.addRoute("command","delete :group :key :v",null);
        router.addRoute("command","expire :group :key :v",null);
        router.addRoute("command","get :group :key :v",null);
        router.addRoute("command","hget :key :field",null);
        router.addRoute("command","config set maxsize :size",null);
        router.addRoute("command","config set maxnum :num",null);
        router.addRoute("command","config set cachestrategy :strategy",null);

        Router.Route route = router.getRoute("command", "delete g1 k1 v1");
        Assertions.assertEquals(route.getNode().getPattern(),"delete :group :key :v");
        Assertions.assertEquals(route.getMap().get("key"),"k1");
    }

服務端

根據以上內容我們可以封裝出RESPContext,RESPRqeuest,以及RESPReqspnse類

RESPContext

https://github.com/weloe/Java-Distributed-Cache/blob/master/src/main/java/com/weloe/cache/server/resp/RESPContext.java

作為一個服務端自然有背景關系資訊,我們將它封裝為一個Context類

/**
 * @author weloe
 */
public class RESPContext {

    private LocalDateTime startTime;

    private Socket socket;

    private RESPRequest request;

    private RESPResponse response;

    private Router.Route route;

    private Map<String,String> paramMap;

    private Group group;


    public void initServer(Socket socket, RESPRequest request, RESPResponse response) throws IOException {

        this.socket = socket;
        this.request = request;
        this.response = response;
        this.startTime = LocalDateTime.now();
    }


    /**
     * 決議RESP協議的位元組
     *
     * @return
     */
    public String parseRESPBytes() {
        String result = request.parseRESPBytes();

        return result;
    }

    /**
     * 決議RESP協議的String
     *
     * @param raw
     * @return
     */
    public Object parseRESPString(String raw) {
        Object obj = request.parseRESPString(raw);
        return obj;
    }

    public void ok() {
        response.ok();
    }

    public void ok(byte[] bytes){
        response.ok(String.valueOf(bytes));
    }

    public void ok(String arg) {
        response.ok(arg);
    }

    public void ok(String... args) {
        response.ok(args);
    }

    public void error(String msg) {
        response.error(msg);
    }

    public void close() {
        if (request != null) {
            try {
                request.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        if (response != null) {
            response.close();
        }

        if (socket != null) {
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

    public Socket getSocket() {
        return socket;
    }

    public RESPRequest getRequest() {
        return request;
    }

    public RESPResponse getResponse() {
        return response;
    }

    public LocalDateTime getStartTime() {
        return startTime;
    }

    public void setStartTime(LocalDateTime startTime) {
        this.startTime = startTime;
    }

    public void setParamMap(Map<String, String> paramMap) {
        this.paramMap = paramMap;
    }

    public Map<String, String> getParamMap() {
        return paramMap;
    }

    public void setRoute(Router.Route route) {
        this.route = route;
    }

    public Router.Route getRoute() {
        return route;
    }

    public void setGroup(Group group) {
        this.group = group;
    }

    public Group getGroup() {
        return group;
    }

    public String getParam(String key){
        return paramMap.get(key);
    }
}

RESPRequest

https://github.com/weloe/Java-Distributed-Cache/blob/master/src/main/java/com/weloe/cache/server/resp/RESPRequest.java

/**
 * @author weloe
 */
public class RESPRequest {
    private InputStream inputStream;

    private Map<String,String> params;

    public RESPRequest(Socket socket) throws IOException {
        this.inputStream = socket.getInputStream();
    }

    public String getParam(String key) {
        return params.get(key);
    }

    /**
     * 決議RESP協議的位元組
     *
     * @return
     */
    public byte[] getBytes() {
        byte[] bytes = null;

        try {
            while (inputStream.available() == 0) {
            }
            bytes = new byte[inputStream.available()];
            inputStream.read(bytes);

        } catch (IOException e) {
            e.printStackTrace();
        }

        return bytes;
    }

    /**
     * 決議RESP協議的位元組
     *
     * @return
     */
    public String parseRESPBytes() {
        byte[] bytes;
        byte[] buf = new byte[1];
        String result = null;
        try {
            System.out.println("等待資料傳輸");
//            try {
//                // 讀不到阻塞
//                inputStream.read(buf);
//            } catch (IOException e) {
//                return null;
//            }
//            result = new String(buf) + new String(bytes);
            while (inputStream.available() == 0) {
            }
            bytes = new byte[inputStream.available()];
            inputStream.read(bytes);
            result = new String(bytes);

        } catch (IOException e) {
            e.printStackTrace();
        }

        return result;
    }

    /**
     * 決議RESP協議的String
     *
     * @param raw
     * @return
     */
    public Object parseRESPString(String raw) {
        byte type = raw.getBytes()[0];
        String result = raw.substring(1);
        switch (type) {
            case '+':
                // +ok\r\n
                // 讀單行
                return result.replace("\r\n", "");
            case '-':
                // 例外
                // -Error msg\r\n
                throw new RuntimeException(result.replace("\r\n", ""));
            case ':':
                // 數字
                return result.replace("\r\n", "");
            case '$':
                return result.substring(1).replace("\r\n", "");
            case '*':
                // 多行字串
                String[] strList = result.substring(result.indexOf("$")).split("\r\n");
                System.out.print("多條批量請求:");
                List<String> list = new LinkedList<>();
                for (int i = 1; i < strList.length; i += 2) {
                    System.out.print(strList[i] + " ");
                    list.add(strList[i]);
                }
                System.out.println();
                return list;
            default:
                throw new RuntimeException("錯誤的資料格式");
        }
    }

    public void close() throws IOException {
        if (inputStream != null) {
            inputStream.close();
        }
    }


    public InputStream getInputStream() {
        return inputStream;
    }
}

RESPResponse

https://github.com/weloe/Java-Distributed-Cache/blob/master/src/main/java/com/weloe/cache/server/resp/RESPResponse.java

/**
 * @author weloe
 */
public class RESPResponse {
    private PrintWriter writer;

    public RESPResponse(Socket socket) throws IOException {
        // 字符輸出流,可以直接按行輸出
        writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8));
    }

    public void ok() {
        writer.println(RESPStatus.OK.getMsg());
        writer.flush();
    }

    public void ok(Integer arg) {
        writer.println(":" + arg);
        writer.flush();
    }

    public void ok(String arg) {
        writer.println("$" + arg.getBytes(StandardCharsets.UTF_8).length);
        writer.println(arg);
        writer.flush();
    }

    public void ok(String... args) {
        writer.println("*" + args.length);
        for (String arg : args) {
            writer.println("$" + arg.getBytes(StandardCharsets.UTF_8).length);
            writer.println(arg);
        }
        writer.flush();
    }

    public void error(String msg) {
        writer.println(RESPStatus.ERROR.getMsg() + " " + msg);
        writer.flush();
    }

    public void close() {
        if (writer != null) {
            writer.close();
        }
    }

    public PrintWriter getWriter() {
        return writer;
    }


}

Service

https://github.com/weloe/Java-Distributed-Cache/tree/master/src/main/java/com/weloe/cache/server/command

命令有對應的HandlerFunc函式,我們就需要在函式中呼叫我們的相應的service,因此需要抽象出對應的service

這里的service為對前篇快取管理中的類的簡單呼叫,就不再一一贅述

ConfigService

/**
 * config操作
 * @author weloe
 */
public class ConfigService {

    public Object getNormalSize(Group group){
        return group.getNormalSize();
    }

    public Object getMaxSize(Group group){
        return group.getMaxSize();
    }

    public Object setMaxSize(Group group,String size) {
        group.setMaxSize(Integer.parseInt(size));
        return "";
    }

    public Object setMaxNum(Group group,String num) {
        return null;
    }

}

DeleteService

/**
 * 洗掉key相關操作
 * @author weloe
 */
public class DeleteService {

    public Object delete(Group group,String key) {
        CacheObj delete = group.delete(key);
        if(delete == null){
            return null;
        }
        return "";
    }


    public Object clear(Group group) {
        group.clear();
        return "";
    }
}

ExpireService

/**
 * 設定,獲取key的過期時間,單位秒
 * @author weloe
 */
public class ExpireService {

    public Object expire(Group group,String key,String value) {
        LocalDateTime time = group.expire(key, Long.parseLong(value), ChronoUnit.SECONDS);
        return time;
    }

    public Object ttl(Group group,String key) {
        long ttl = group.ttl(key);
        return ttl;
    }
}

GroupService

/**
 * group操作
 * @author weloe
 */
public class GroupService {

    private GroupManager groupManager = GroupManager.getInstance();

    public Object add(String groupName) {

        Group group = new Group();
        group.setName(groupName);
        group.setCache(new Cache());
        group.setGetter(k -> null);
        groupManager.put(group);

        return "";
    }


}

StringService

/**
 * 快取set,get操作
 * @author weloe
 */
public class StringService {

    public String get(Group group,String key) {
        CacheObj cacheObj = group.get(key);
        if(cacheObj != null){
            return new String(cacheObj.getData());
        }
        return null;
    }


    public Object set(Group group,String key,String value) {
        group.putCacheObj(key, new CacheObj(value.getBytes(StandardCharsets.UTF_8)));
        return "";
    }
}

ServiceFactory

為了方便管理,另外寫了管理service的類

public class ServiceFactory {

    Map<String,Object> map;

    public ServiceFactory() {
        map = new LinkedHashMap<>();
        map.put("str",new StringService());
        map.put("group",new GroupService());
        map.put("config",new ConfigService());
        map.put("delete",new DeleteService());
        map.put("expire",new ExpireService());
    }


    public<T> T getBean(String name){
        return (T) map.get(name);
    }
}

CommandQueue

https://github.com/weloe/Java-Distributed-Cache/blob/master/src/main/java/com/weloe/cache/server/parser/CommandQueue.java

這里我們使用多執行緒io決議命令,單執行緒執行命令的模型,多執行緒決議完命令得到Route后把Context加入到阻塞佇列中,再消費阻塞佇列的Context執行命令

public class CommandQueue {

    private static PriorityBlockingQueue<RESPContext> commandQueue = new PriorityBlockingQueue<>(5, (o1, o2) -> {
        if (o1.getStartTime().isBefore(o2.getStartTime())) {
            return -1;
        }
        return 1;
    });

    public boolean add(RESPContext context){
        return commandQueue.add(context);
    }


    public void consume() {
        new Thread(() -> {
            System.out.println("服務端等待接收命令...");
            while (true) {
                RESPContext respContext = null;
                try {
                    respContext = commandQueue.take();
                } catch (InterruptedException e) {
                    respContext.error(e.getMessage());
                    e.printStackTrace();
                    continue;
                }

                System.out.println("執行命令"+respContext.getRoute().getNode().getPattern());
                // 執行命令
                Object handle = respContext.getRoute().handle(respContext);
                System.out.println(handle);
                if(handle == null){
                    respContext.ok("nil");
                }else if(handle.equals("")){
                    respContext.ok();
                }else {
                    respContext.ok(handle.toString());
                }
            }
        }).start();


    }

    public PriorityBlockingQueue<RESPContext> getCommandQueue() {
        return commandQueue;
    }
}

Launch服務端啟動類

https://github.com/weloe/Java-Distributed-Cache/blob/master/src/main/java/com/weloe/cache/Launch.java

public class Launch {
    private static final ExecutorService poolExecutor = new ThreadPoolExecutor(2, 5,
            30L, TimeUnit.SECONDS,
            new ArrayBlockingQueue(10));

    private static final GroupManager groupManager = GroupManager.getInstance();

    private static final ServiceFactory factory = new ServiceFactory();

    private static final Router router = new Router();

    static {
        // 注冊路由資訊
        ConfigService config = factory.getBean("config");
        StringService str = factory.getBean("str");
        DeleteService delete = factory.getBean("delete");
        ExpireService expire = factory.getBean("expire");
        GroupService groupService = factory.getBean("group");

        router.addRoute("group add :name", c -> groupService.add(c.getParam("name")));

        router.addRoute("config set maxByteSize :group :size", c -> config.setMaxSize(c.getGroup(),c.getParam("size")))
              .addRoute("config get maxByteSize :group", c -> config.getMaxSize(c.getGroup()))
              .addRoute("config get normalSize :group", c -> config.getNormalSize(c.getGroup()))
              .addRoute("config set maxNum :group :num", c -> config.setMaxNum(c.getGroup(),c.getParam("num")));


        router.addRoute("expire :group :k :n", c -> expire.expire(c.getGroup(),c.getParam("k"),c.getParam("n")))
              .addRoute("ttl :group :k", c -> expire.ttl(c.getGroup(),c.getParam("k")));

        router.addRoute("delete :group :k", c -> delete.delete(c.getGroup(),c.getParam("size")))
              .addRoute("clear :group", c -> delete.clear(c.getGroup()));

        router.addRoute("set :group :k :v", c -> str.set(c.getGroup(),c.getParam("k"),c.getParam("v")))
              .addRoute("get :group :k", c -> str.get(c.getGroup(),c.getParam("k")));


    }


    public static void main(String[] args) throws IOException {

        CommandQueue commandQueue = new CommandQueue();
        commandQueue.consume();

        ServerSocket serverSocket = new ServerSocket(8081);

        while (true) {
            // 初始化server
            Socket socket = serverSocket.accept();
            System.out.println(socket.getInetAddress() + ":" + socket.getPort() + "連接");
            poolExecutor.submit(() -> task(commandQueue, socket));

        }


    }

    private static void task(CommandQueue commandQueue, Socket socket) {
        RESPContext context = null;

        System.out.println("執行緒"+Thread.currentThread().getId()+" 執行");
        try {
            while (true) {
                context = new RESPContext();
                context.initServer(socket,new RESPRequest(socket),new RESPResponse(socket));

                Object requestData = https://www.cnblogs.com/weloe/p/null;
                try {
                    // 處理請求
                    String res = context.parseRESPBytes();
                    if(res == null){
                        return;
                    }
                    System.out.printf("%s => %s%n", "原始格式", res.replace("\r\n", "\\r\\n"));
                    requestData = https://www.cnblogs.com/weloe/p/context.parseRESPString(res);

                } catch (Exception e) {
                    System.out.println(e.getMessage());
                    context.error(e.getMessage());
                    continue;
                }

                List commandStr = (List) requestData;

                System.out.println("接收到" + socket.getInetAddress() + ":" + socket.getPort() +"的命令");

                // 決議命令
                Router.Route route = router.getRoute(String.join(" ",commandStr));
                if (route == null) {
                    context.ok("請檢查你的命令引數");
                    continue;
                }

                Map<String, String> paramMap = route.getMap();
                String name = paramMap.get("group");
                if(name != null){
                    if(groupManager.getGroup(name) == null){
                        context.ok("該group不存在");
                        continue;
                    }
                    context.setGroup(groupManager.getGroup(name));
                }
                context.setRoute(route);


                // 命令加入阻塞佇列
                commandQueue.add(context);
            }

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            context.close();
        }
    }

}

Client

https://github.com/weloe/Java-Distributed-Cache/blob/master/src/main/java/com/weloe/cache/client/Client.java

public class ClientLaunch {
    static Socket socket;
    static PrintWriter writer;
    static BufferedReader reader;
    static InputStream inputStream;

    static String host = "127.0.0.1";
    static int port = 8081;

    public static void main(String[] args) {

        try {
            // 建立連接
            socket = new Socket(host,port);
            // 獲取輸出輸入流
            // 字符輸出流,可以直接按行輸出
            writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8));
            inputStream = socket.getInputStream();

            while (true) {
                Scanner reader = new Scanner(System.in);
                if (reader.hasNextLine()) {
                    String s = reader.nextLine();
                    if("exit".equals(s)){
                        System.out.println("exit cli");
                        return;
                    }
                    if (!s.isEmpty()) {
                        Object obj;
                        // 操作命令
                        sendRequest(s.split(" "));
                        // 回應
                        obj = inputStreamHandleByteResponse();
                        System.out.println(obj);
                    }
                }
            }

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            // 釋放連接
            try {
                if(reader != null) {
                    reader.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                if(inputStream != null) {
                    inputStream.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                if(writer != null) {
                    writer.close();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            try {
                if(socket != null) {
                    socket.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

    private static void sendRequest(String ... args) {

        writer.println("*"+args.length);
        for (String arg : args) {
            writer.println("$"+arg.getBytes(StandardCharsets.UTF_8).length);
            writer.println(arg);
        }

        writer.flush();

    }


    public static Object inputStreamHandleByteResponse() {
        String result = RESPUtil.parseRESPBytes(inputStream);
        return RESPUtil.parseRESPString(result);
    }


}

測驗

最后我們就可以啟動Launch和Client來進行一下使用了

服務端啟動

服務端等待接收命令...
/127.0.0.1:53779連接
執行緒13 執行
等待資料傳輸

客戶端發送請求group add test

服務端回傳

ok

服務端輸出

原始格式 => *3\r\n$5\r\ngroup\r\n$3\r\nadd\r\n$4\r\ntest\r\n
多條批量請求:group add test 
接收到/127.0.0.1:53779的命令
執行命令group add :name

客戶端設定快取set test testKey testV

服務端回傳

ok

服務端輸出

原始格式 => *4\r\n$3\r\nset\r\n$4\r\ntest\r\n$7\r\ntestKey\r\n$5\r\ntestV\r\n
多條批量請求:set test testKey testV 
接收到/127.0.0.1:53779的命令
執行命令set :group :k :v

客戶端get操作get test testKey

服務端回傳

testV

至此,單機的快取就基本完成~

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/543248.html

標籤:Java

上一篇:Redis 異步客戶端選型及落地實踐

下一篇:SpringBoot

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more