Skip to content

Commit

Permalink
refactor connection handler
Browse files Browse the repository at this point in the history
  • Loading branch information
tarun388 committed Mar 18, 2024
1 parent ba0b1a5 commit af887ce
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 95 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,12 @@ Server expects expects an array of RESP bulk strings per request.
## How to Run
Start Jredis server on localhost port 6379
> java -jar target/jredis-1.0-SNAPSHOT.jar
## Redis benchmark

```
$ redis-benchmark -t SET,GET -q
WARNING: Could not fetch server CONFIG
SET: 12084.59 requests per second, p50=3.703 msec
GET: 17376.19 requests per second, p50=2.703 msec
```
92 changes: 4 additions & 88 deletions src/main/java/com/jredis/ConnectionHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,11 @@
@Slf4j
public class ConnectionHandler implements Runnable{
private final Socket clientSocket;
private final Serializer serializer;
private final Deserializer deserializer;
private Storage db;
private final RESPRequestHandler requestHandler;

public ConnectionHandler(Socket clientSocket, Serializer serializer, Deserializer deserializer, Storage db) {
public ConnectionHandler(Socket clientSocket, RESPRequestHandler requestHandler) {
this.clientSocket = clientSocket;
this.serializer = serializer;
this.deserializer = deserializer;
this.db = db;
this.requestHandler = requestHandler;
}

@Override
Expand All @@ -39,7 +35,7 @@ private void handleClient(Socket clientSocket) {

while (clientSocket.isConnected()) {
String request = readCommand(reader);
String response = processRequest(request);
String response =requestHandler.processRequest(request);
log.debug("Sending response: " + response);
outputStream.write(response.getBytes());
outputStream.flush();
Expand Down Expand Up @@ -84,85 +80,5 @@ private int readItems(String r) {
}
}

// Only process
// PING
// ECHO MSG
// SET
// GET
// EXISTS
private String processRequest(String request) {
log.debug(request);
Object o = deserializer.deserialize(request);

// ToDo Input validation
// Validate the commands

// Server expects an array of bulk strings
// *<length>\r\n$<len><msg>...
if (o instanceof Object[]) {
String command = (String) ((Object[]) o)[0];

log.debug(String.format("Command : %s", command));

// PING
if (Objects.equals(command, "PING")) {
return serializer.serialize("PONG");
}
// ECHO
else if (Objects.equals(command, "ECHO")) {
return serializer.serialize((String) ((Object[]) o)[1]);
}
else if (Objects.equals(command, "SET")) {
// ToDo Move this blocks of code in another class
// called Command.java
String key = (String) ((Object[]) o)[1];
String value = (String) ((Object[]) o)[2];
Long expiryTime = Storage.INFINITE_EXPIRATION;

// ToDo Only allow either one
// EX | PX | EXAT | PXAT
// source: https://redis.io/commands/set/
for (int i=3;i<((Object[]) o).length;i+=2) {
String c = (String) ((Object[]) o)[i];
String x = (String) ((Object[]) o)[i+1];
if (c.equals("EX")) {
expiryTime = System.currentTimeMillis() + Long.parseLong(x) * 1000L;
}
else if (c.equals("PX")) {
expiryTime = System.currentTimeMillis() + Long.parseLong(x);
}
else if (c.equals("EXAT")) {
expiryTime = Long.parseLong(x) * 1000;
} else if (c.equals("PXAT")) {
expiryTime = Long.parseLong(x);
} else {
return serializer.serializeError("Invalid input");
}
}

db.set(key, value, expiryTime);
return serializer.serialize("OK");
}
else if (Objects.equals(command, "GET")) {
String key = (String) ((Object[]) o)[1];
return serializer.serialize(db.get(key));
}
else if (Objects.equals(command, "EXISTS")) {
String key = (String) ((Object[]) o)[1];
// ToDo Extend Storage class to support bool contains(key)
String value = db.get(key);
if (value == null) {
return serializer.serialize(0);
}
else {
return serializer.serialize(1);
}
}
else if (Objects.equals(command, "DEL")) {
String key = (String) ((Object[]) o)[1];
return serializer.serialize(db.remove(key));
}
}
return serializer.serializeError("Command not supported");
}
}
103 changes: 103 additions & 0 deletions src/main/java/com/jredis/RESPRequestHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package com.jredis;

import com.jredis.db.Storage;
import com.jredis.serialize.Deserializer;
import com.jredis.serialize.Serializer;
import lombok.extern.slf4j.Slf4j;

import java.util.Objects;

@Slf4j
public class RESPRequestHandler {
private Deserializer deserializer;
private Serializer serializer;
private Storage db;

public RESPRequestHandler(Deserializer deserializer, Serializer serializer, Storage db) {
this.deserializer = deserializer;
this.serializer = serializer;
this.db = db;
}

// Only process
// PING
// ECHO MSG
// SET
// GET
// EXISTS
public String processRequest(String request) {
log.debug(request);
Object o = deserializer.deserialize(request);

// ToDo Input validation
// Validate the commands

// Server expects an array of bulk strings
// *<length>\r\n$<len><msg>...
if (o instanceof Object[]) {
String command = (String) ((Object[]) o)[0];

log.debug(String.format("Command : %s", command));

// PING
if (Objects.equals(command, "PING")) {
return serializer.serialize("PONG");
}
// ECHO
else if (Objects.equals(command, "ECHO")) {
return serializer.serialize((String) ((Object[]) o)[1]);
}
else if (Objects.equals(command, "SET")) {
// ToDo Move this blocks of code in another class
// called Command.java
String key = (String) ((Object[]) o)[1];
String value = (String) ((Object[]) o)[2];
Long expiryTime = Storage.INFINITE_EXPIRATION;

// ToDo Only allow either one
// EX | PX | EXAT | PXAT
// source: https://redis.io/commands/set/
for (int i=3;i<((Object[]) o).length;i+=2) {
String c = (String) ((Object[]) o)[i];
String x = (String) ((Object[]) o)[i+1];
if (c.equals("EX")) {
expiryTime = System.currentTimeMillis() + Long.parseLong(x) * 1000L;
}
else if (c.equals("PX")) {
expiryTime = System.currentTimeMillis() + Long.parseLong(x);
}
else if (c.equals("EXAT")) {
expiryTime = Long.parseLong(x) * 1000;
} else if (c.equals("PXAT")) {
expiryTime = Long.parseLong(x);
} else {
return serializer.serializeError("Invalid input");
}
}

db.set(key, value, expiryTime);
return serializer.serialize("OK");
}
else if (Objects.equals(command, "GET")) {
String key = (String) ((Object[]) o)[1];
return serializer.serialize(db.get(key));
}
else if (Objects.equals(command, "EXISTS")) {
String key = (String) ((Object[]) o)[1];
// ToDo Extend Storage class to support bool contains(key)
String value = db.get(key);
if (value == null) {
return serializer.serialize(0);
}
else {
return serializer.serialize(1);
}
}
else if (Objects.equals(command, "DEL")) {
String key = (String) ((Object[]) o)[1];
return serializer.serialize(db.remove(key));
}
}
return serializer.serializeError("Command not supported");
}
}
10 changes: 3 additions & 7 deletions src/main/java/com/jredis/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,10 @@
public class Server {
// Todo Accept port from user
private static final int PORT = 6379;
private final Serializer serializer;
private final Deserializer deserializer;
private Storage db;
private final RESPRequestHandler requestHandler;

public Server() {
this.serializer = new Serializer();
this.deserializer = new Deserializer();
this.db = new Storage();
this.requestHandler = new RESPRequestHandler(new Deserializer(), new Serializer(), new Storage());
}

public void start() {
Expand All @@ -32,7 +28,7 @@ public void start() {
Socket clientSocket = serverSocket.accept();
log.info("Connection established to : {}", clientSocket.getPort());

Thread clientConnection = new Thread(new ConnectionHandler(clientSocket, serializer, deserializer, db));
Thread clientConnection = new Thread(new ConnectionHandler(clientSocket, requestHandler));
log.info("Thread created {}", clientConnection.getId());

clientConnection.start();
Expand Down

0 comments on commit af887ce

Please sign in to comment.