Skip to content

Commit

Permalink
chore: remove AbstractDistributedLock class and lock method
Browse files Browse the repository at this point in the history
  • Loading branch information
VGalaxies committed Sep 10, 2023
1 parent 3125222 commit dc2c9ad
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

import org.apache.commons.io.FileUtils;
import org.apache.hugegraph.HugeException;
import org.apache.hugegraph.meta.lock.DistributedLock;
import org.apache.hugegraph.meta.lock.EtcdDistributedLock;
import org.apache.hugegraph.meta.lock.LockResult;
import org.apache.hugegraph.type.define.CollectionType;
import org.apache.hugegraph.util.E;
Expand Down Expand Up @@ -57,8 +57,8 @@

public class EtcdMetaDriver implements MetaDriver {

private Client client;
private DistributedLock lock;
private final Client client;
private final EtcdDistributedLock lock;

public EtcdMetaDriver(String trustFile, String clientCertFile,
String clientKeyFile, Object... endpoints) {
Expand All @@ -67,13 +67,13 @@ public EtcdMetaDriver(String trustFile, String clientCertFile,
SslContext sslContext = openSslContext(trustFile, clientCertFile,
clientKeyFile);
this.client = builder.sslContext(sslContext).build();
this.lock = DistributedLock.getInstance(this.client);
this.lock = EtcdDistributedLock.getInstance(this.client);
}

public EtcdMetaDriver(Object... endpoints) {
ClientBuilder builder = this.etcdMetaDriverBuilder(endpoints);
this.client = builder.build();
this.lock = DistributedLock.getInstance(this.client);
this.lock = EtcdDistributedLock.getInstance(this.client);
}

private static ByteSequence toByteSequence(String content) {
Expand Down Expand Up @@ -165,7 +165,7 @@ public String get(String key) {
throw new HugeException("Failed to get key '%s' from etcd", e, key);
}

if (keyValues.size() > 0) {
if (!keyValues.isEmpty()) {
return keyValues.get(0).getValue().toString(Charset.defaultCharset());
}

Expand Down Expand Up @@ -230,7 +230,7 @@ public Map<String, String> scanWithPrefix(String prefix) {
CollectionType.JCF, size);
for (KeyValue kv : response.getKvs()) {
String key = kv.getKey().toString(Charset.defaultCharset());
String value = kv.getValue().size() == 0 ? "" :
String value = kv.getValue().isEmpty() ? "" :
kv.getValue().toString(Charset.defaultCharset());
keyValues.put(key, value);
}
Expand Down Expand Up @@ -283,22 +283,13 @@ public LockResult tryLock(String key, long ttl, long timeout) {
return this.lock.tryLock(key, ttl, timeout);
}

@Override
public LockResult lock(String key, long ttl) {
return this.lock.lock(key, ttl);
}

@Override
public boolean isLocked(String key) {
try {
long size = this.client.getKVClient().get(toByteSequence(key))
.get().getCount();

if (size > 0) {
return true;
} else {
return false;
}
return size > 0;
} catch (InterruptedException | ExecutionException e) {
throw new HugeException("Failed to check is locked '%s'", e, key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,22 +509,6 @@ public void notifyGraphEdgeCacheClear(String graphSpace, String graph) {
this.graphMetaManager.notifyGraphEdgeCacheClear(graphSpace, graph);
}

public LockResult lock(String... keys) {
return this.lockMetaManager.lock(keys);
}

public LockResult lock(long ttl, String... keys) {
return this.lockMetaManager.lock(ttl, keys);
}

public LockResult lock(String key, long ttl) {
return this.lockMetaManager.lock(key, ttl);
}

public LockResult lock(String key) {
return this.lockMetaManager.lock(key);
}

public LockResult tryLock(String key) {
return this.lockMetaManager.tryLock(key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

package org.apache.hugegraph.meta;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -44,35 +42,15 @@

public class PdMetaDriver implements MetaDriver {

KvClient<WatchResponse> client = null;
PDClient pdClient = null;
private PdDistributedLock lock;
private final KvClient<WatchResponse> client;
private final PDClient pdClient;
private final PdDistributedLock lock;

public PdMetaDriver(String pdPeer) {
PDConfig pdConfig = PDConfig.of(pdPeer);
this.client = new KvClient<>(pdConfig);
this.pdClient = PDClient.create(pdConfig);
lock = new PdDistributedLock(this.client);
}

public static void main(String[] args) {
PDConfig pdConfig = PDConfig.of("127.0.0.1:8686");
KvClient<WatchResponse> client = new KvClient<>(pdConfig);
ScanPrefixResponse contents;
try {
contents = client.scanPrefix("HUGEGRAPH/METRICS");
Map<String, String> map = contents.getKvsMap();
for (Map.Entry<String, String> entry : map.entrySet()) {
System.out.println(entry.getKey() + ":" + entry.getValue());
}
System.out.println(map.size());
} catch (PDException e) {
e.printStackTrace();
}
Date dNow = new Date();
System.out.println(dNow);
SimpleDateFormat ft = new SimpleDateFormat("HHmmss");
System.out.println("当前时间为: " + ft.format(dNow));
this.lock = new PdDistributedLock(this.client);
}

public PDClient pdClient() {
Expand Down Expand Up @@ -179,22 +157,6 @@ public <T> Map<String, String> extractKVFromResponse(T response) {
return resultMap;
}

@Override
public LockResult lock(String key, long ttl) {
while (true) {
LockResult lock = this.lock.lock(key, ttl);
if (lock.lockSuccess()) {
return lock;
} else {
try {
Thread.sleep(1000L);
} catch (Exception e) {

}
}
}
}

@Override
public LockResult tryLock(String key, long ttl, long timeout) {
return this.lock.lock(key, ttl);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import java.nio.charset.Charset;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand All @@ -33,26 +33,33 @@
import io.etcd.jetcd.Lease;
import io.etcd.jetcd.Lock;

public class DistributedLock extends AbstractDistributedLock {
public class EtcdDistributedLock {

protected static final Logger LOG = Log.logger(DistributedLock.class);
private static final long UNLIMIT_TIMEOUT = -1L;
protected static final Logger LOG = Log.logger(EtcdDistributedLock.class);
private static final long UNLIMITED_TIMEOUT = -1L;
private final static Object mutex = new Object();
private static DistributedLock lockProvider = null;
private static EtcdDistributedLock lockProvider = null;
private final KV kvClient;
private final Lock lockClient;
private final Lease leaseClient;

private DistributedLock(Client client) {
private static final int poolSize = 8;
private final ScheduledExecutorService service = new ScheduledThreadPoolExecutor(poolSize, r -> {
Thread t = new Thread(r, "keepalive");
t.setDaemon(true);
return t;
});

private EtcdDistributedLock(Client client) {
this.kvClient = client.getKVClient();
this.lockClient = client.getLockClient();
this.leaseClient = client.getLeaseClient();
}

public static DistributedLock getInstance(Client client) {
public static EtcdDistributedLock getInstance(Client client) {
synchronized (mutex) {
if (null == lockProvider) {
lockProvider = new DistributedLock(client);
lockProvider = new EtcdDistributedLock(client);
}
}
return lockProvider;
Expand All @@ -64,13 +71,10 @@ private static ByteSequence toByteSequence(String content) {

public LockResult tryLock(String lockName, long ttl, long timeout) {
LockResult lockResult = new LockResult();
ScheduledExecutorService service =
Executors.newSingleThreadScheduledExecutor();

lockResult.lockSuccess(false);
lockResult.setService(service);

Long leaseId;
long leaseId;

try {
leaseId = this.leaseClient.grant(ttl).get().getID();
Expand All @@ -89,7 +93,7 @@ public LockResult tryLock(String lockName, long ttl, long timeout) {
period, period, TimeUnit.SECONDS);

try {
if (timeout == UNLIMIT_TIMEOUT) {
if (timeout == UNLIMITED_TIMEOUT) {
this.lockClient.lock(toByteSequence(lockName), leaseId).get();

} else {
Expand Down Expand Up @@ -117,12 +121,10 @@ public LockResult tryLock(String lockName, long ttl, long timeout) {
return lockResult;
}

@Override
public LockResult lock(String lockName, long ttl) {
return tryLock(lockName, ttl, UNLIMIT_TIMEOUT);
return tryLock(lockName, ttl, UNLIMITED_TIMEOUT);
}

@Override
public void unLock(String lockName, LockResult lockResult) {
LOG.debug("Thread {} start to unlock {}",
Thread.currentThread().getName(), lockName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class LockResult {
private boolean lockSuccess;
private long leaseId;
private ScheduledExecutorService service;
private ScheduledFuture future;
private ScheduledFuture<?> future;

public void lockSuccess(boolean isLockSuccess) {
this.lockSuccess = isLockSuccess;
Expand All @@ -51,11 +51,11 @@ public void setService(ScheduledExecutorService service) {
this.service = service;
}

public ScheduledFuture getFuture() {
public ScheduledFuture<?> getFuture() {
return future;
}

public void setFuture(ScheduledFuture future) {
public void setFuture(ScheduledFuture<?> future) {
this.future = future;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,20 @@
import org.apache.hugegraph.pd.common.PDException;
import org.apache.hugegraph.pd.grpc.kv.LockResponse;

/**
* @author zhangyingjie
* @date 2022/6/18
**/
public class PdDistributedLock extends AbstractDistributedLock {
public class PdDistributedLock {

private static int poolSize = 8;
private final KvClient client;
private ScheduledExecutorService service = new ScheduledThreadPoolExecutor(poolSize, r -> {
Thread t = new Thread(r);
private static final int poolSize = 8;
private final KvClient<?> client;
private final ScheduledExecutorService service = new ScheduledThreadPoolExecutor(poolSize, r -> {
Thread t = new Thread(r, "keepalive");
t.setDaemon(true);
return t;
});

public PdDistributedLock(KvClient client) {
public PdDistributedLock(KvClient<?> client) {
this.client = client;
}

@Override
public LockResult lock(String key, long second) {
long ttl = second * 1000L;
try {
Expand All @@ -69,12 +64,11 @@ public LockResult lock(String key, long second) {
}
}

@Override
public void unLock(String key, LockResult lockResult) {
try {
LockResponse response = this.client.unlock(key);
boolean succeed = response.getSucceed();
if (succeed == false) {
if (!succeed) {
throw new HugeException("Failed to unlock '%s' to pd", key);
}
if (lockResult.getFuture() != null) {
Expand All @@ -95,6 +89,4 @@ public boolean keepAlive(String key) {
throw new HugeException("Failed to keepAlive '%s' to pd", key);
}
}


}
Loading

0 comments on commit dc2c9ad

Please sign in to comment.