-
Notifications
You must be signed in to change notification settings - Fork 12.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #5 from lostcharlie/hybrid-logical-clock
Implement the hybrid logical clock
- Loading branch information
Showing
3 changed files
with
262 additions
and
0 deletions.
There are no files selected for viewing
79 changes: 79 additions & 0 deletions
79
.../src/main/java/com/alibaba/nacos/naming/consistency/weak/tree/hlc/HybridLogicalClock.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
/* | ||
* Copyright 1999-2019 Alibaba Group Holding Ltd. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package com.alibaba.nacos.naming.consistency.weak.tree.hlc; | ||
|
||
/** | ||
* A hybrid logical clock consists of two parts: | ||
* - wall time: the maximum physical time it has seen | ||
* - logical clock: the logical clock | ||
* | ||
* @author lostcharlie | ||
*/ | ||
public class HybridLogicalClock { | ||
private String processName; | ||
private long wallTime; | ||
private long logicalClock; | ||
|
||
public String getProcessName() { | ||
return processName; | ||
} | ||
|
||
private void setProcessName(String processName) { | ||
this.processName = processName; | ||
} | ||
|
||
public long getWallTime() { | ||
return wallTime; | ||
} | ||
|
||
private void setWallTime(long wallTime) { | ||
this.wallTime = wallTime; | ||
} | ||
|
||
public long getLogicalClock() { | ||
return logicalClock; | ||
} | ||
|
||
private void setLogicalClock(long logicalClock) { | ||
this.logicalClock = logicalClock; | ||
} | ||
|
||
public HybridLogicalClock(String processName, long wallTime, long logicalClock) { | ||
this.setProcessName(processName); | ||
this.setWallTime(wallTime); | ||
this.setLogicalClock(logicalClock); | ||
} | ||
|
||
public HybridLogicalClock(String processName) { | ||
this(processName, 0, 0); | ||
} | ||
|
||
public void set(long wallTime, long logicalClock) { | ||
this.setWallTime(wallTime); | ||
this.setLogicalClock(logicalClock); | ||
} | ||
|
||
public boolean smallerThan(HybridLogicalClock another) { | ||
if (this.getWallTime() < another.getWallTime()) { | ||
return true; | ||
} | ||
if ((this.getWallTime() == another.getWallTime()) | ||
&& (this.getLogicalClock() < another.getLogicalClock())) { | ||
return true; | ||
} | ||
return false; | ||
} | ||
} |
107 changes: 107 additions & 0 deletions
107
...ava/com/alibaba/nacos/naming/consistency/weak/tree/hlc/HybridLogicalClockCoordinator.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
/* | ||
* Copyright 1999-2019 Alibaba Group Holding Ltd. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package com.alibaba.nacos.naming.consistency.weak.tree.hlc; | ||
|
||
import org.springframework.beans.factory.annotation.Value; | ||
import org.springframework.stereotype.Component; | ||
|
||
import java.util.UUID; | ||
|
||
/** | ||
* @author lostcharlie | ||
*/ | ||
@Component | ||
public class HybridLogicalClockCoordinator { | ||
private static final long STEPS = 1; | ||
private HybridLogicalClock current; | ||
private long maxOffset; | ||
|
||
public HybridLogicalClock getCurrent() { | ||
return current; | ||
} | ||
|
||
private void setCurrent(HybridLogicalClock current) { | ||
this.current = current; | ||
} | ||
|
||
public long getMaxOffset() { | ||
return maxOffset; | ||
} | ||
|
||
@Value("${nacos.naming.tree.hlc.maxOffset:500}") | ||
private void setMaxOffset(long maxOffset) { | ||
this.maxOffset = maxOffset; | ||
} | ||
|
||
public HybridLogicalClockCoordinator() { | ||
this.setCurrent(new HybridLogicalClock(UUID.randomUUID().toString())); | ||
} | ||
|
||
private long getPhysicalTime() { | ||
return System.currentTimeMillis(); | ||
} | ||
|
||
public synchronized HybridLogicalClock generateForSending() { | ||
long currentWallTime = this.getCurrent().getWallTime(); | ||
long currentLogicalClock = this.getCurrent().getLogicalClock(); | ||
long currentPhysicalTime = this.getPhysicalTime(); | ||
long targetWallTime = Math.max(currentWallTime, currentPhysicalTime); | ||
long targetLogicalClock; | ||
if (targetWallTime == currentWallTime) { | ||
targetLogicalClock = currentLogicalClock + HybridLogicalClockCoordinator.STEPS; | ||
} else { | ||
targetLogicalClock = 0; | ||
} | ||
this.getCurrent().set(targetWallTime, targetLogicalClock); | ||
return new HybridLogicalClock(this.getCurrent().getProcessName(), targetWallTime, targetLogicalClock); | ||
} | ||
|
||
public synchronized HybridLogicalClock generateForReceiving(HybridLogicalClock remoteClock) { | ||
long currentWallTime = this.getCurrent().getWallTime(); | ||
long currentLogicalClock = this.getCurrent().getLogicalClock(); | ||
long remoteWallTime = remoteClock.getWallTime(); | ||
long remoteLogicalClock = remoteClock.getLogicalClock(); | ||
long currentPhysicalTime = this.getPhysicalTime(); | ||
long targetWallTime = Math.max(currentWallTime, Math.max(remoteWallTime, currentPhysicalTime)); | ||
long targetLogicalClock; | ||
if (targetWallTime == currentWallTime && targetWallTime == remoteWallTime) { | ||
targetLogicalClock = Math.max(currentLogicalClock, remoteLogicalClock) + HybridLogicalClockCoordinator.STEPS; | ||
} else if (targetWallTime == currentWallTime) { | ||
targetLogicalClock = currentLogicalClock + HybridLogicalClockCoordinator.STEPS; | ||
} else if (targetWallTime == remoteWallTime) { | ||
targetLogicalClock = remoteLogicalClock + HybridLogicalClockCoordinator.STEPS; | ||
} else { | ||
targetLogicalClock = 0; | ||
} | ||
this.getCurrent().set(targetWallTime, targetLogicalClock); | ||
return new HybridLogicalClock(this.getCurrent().getProcessName(), targetWallTime, targetLogicalClock); | ||
} | ||
|
||
public boolean isHappenBefore(HybridLogicalClock former, HybridLogicalClock latter) { | ||
// Currently, for events happen on different processes, it returns true only if the former event | ||
// happens before the latter event in real time. | ||
// The maximum offset of real time between two nodes is defined in the field "maxOffset". | ||
if (former.getProcessName().equals(latter.getProcessName())) { | ||
return former.smallerThan(latter); | ||
} else { | ||
return ((former.getWallTime() + this.getMaxOffset()) < latter.getWallTime()); | ||
} | ||
} | ||
|
||
public boolean isConcurrent(HybridLogicalClock former, HybridLogicalClock latter) { | ||
return (!this.isHappenBefore(former, latter) && !this.isHappenBefore(latter, former)); | ||
} | ||
} |
76 changes: 76 additions & 0 deletions
76
.../test/java/com/alibaba/nacos/naming/consistency/weak/tree/hlc/HybridLogicalClockTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
/* | ||
* Copyright 1999-2019 Alibaba Group Holding Ltd. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package com.alibaba.nacos.naming.consistency.weak.tree.hlc; | ||
|
||
import org.junit.Assert; | ||
import org.junit.Test; | ||
import org.springframework.test.util.ReflectionTestUtils; | ||
|
||
/** | ||
* @author lostcharlie | ||
*/ | ||
public class HybridLogicalClockTest { | ||
|
||
@Test | ||
public void testGenerateForSending() throws Exception { | ||
HybridLogicalClockCoordinator coordinator = new HybridLogicalClockCoordinator(); | ||
ReflectionTestUtils.setField(coordinator, "maxOffset", 100); | ||
HybridLogicalClock timestampOne = coordinator.generateForSending(); | ||
HybridLogicalClock timestampTwo = coordinator.generateForSending(); | ||
Thread.sleep(150); | ||
HybridLogicalClock timestampThree = coordinator.generateForSending(); | ||
Assert.assertTrue(timestampOne.smallerThan(timestampTwo)); | ||
Assert.assertTrue(coordinator.isHappenBefore(timestampOne, timestampTwo)); | ||
Assert.assertFalse(coordinator.isConcurrent(timestampOne, timestampTwo)); | ||
Assert.assertTrue(coordinator.isHappenBefore(timestampOne, timestampThree)); | ||
Assert.assertTrue(coordinator.isHappenBefore(timestampTwo, timestampThree)); | ||
} | ||
|
||
@Test | ||
public void testGenerateForReceiving() throws Exception { | ||
HybridLogicalClockCoordinator coordinatorOne = new HybridLogicalClockCoordinator(); | ||
HybridLogicalClockCoordinator coordinatorTwo = new HybridLogicalClockCoordinator(); | ||
ReflectionTestUtils.setField(coordinatorOne, "maxOffset", 100); | ||
ReflectionTestUtils.setField(coordinatorTwo, "maxOffset", 100); | ||
HybridLogicalClock timestampOne = coordinatorOne.generateForSending(); | ||
HybridLogicalClock timestampTwo = coordinatorTwo.generateForSending(); | ||
HybridLogicalClock timestampThree = coordinatorTwo.generateForReceiving(timestampOne); | ||
HybridLogicalClock timestampFour = coordinatorOne.generateForReceiving(timestampTwo); | ||
Assert.assertTrue(coordinatorOne.isHappenBefore(timestampOne, timestampFour)); | ||
Assert.assertTrue(coordinatorOne.isHappenBefore(timestampTwo, timestampThree)); | ||
Assert.assertTrue(coordinatorOne.isConcurrent(timestampOne, timestampTwo)); | ||
Assert.assertTrue(coordinatorOne.isConcurrent(timestampThree, timestampFour)); | ||
} | ||
|
||
@Test | ||
public void testHappenBefore() throws Exception { | ||
HybridLogicalClockCoordinator coordinatorOne = new HybridLogicalClockCoordinator(); | ||
HybridLogicalClockCoordinator coordinatorTwo = new HybridLogicalClockCoordinator(); | ||
ReflectionTestUtils.setField(coordinatorOne, "maxOffset", 100); | ||
ReflectionTestUtils.setField(coordinatorTwo, "maxOffset", 100); | ||
HybridLogicalClock timestampOne = coordinatorOne.generateForSending(); | ||
HybridLogicalClock timestampTwo = coordinatorTwo.generateForSending(); | ||
Thread.sleep(150); | ||
HybridLogicalClock timestampThree = coordinatorOne.generateForSending(); | ||
HybridLogicalClock timestampFour = coordinatorTwo.generateForSending(); | ||
Assert.assertTrue(coordinatorOne.isHappenBefore(timestampOne, timestampThree)); | ||
Assert.assertTrue(coordinatorOne.isHappenBefore(timestampOne, timestampFour)); | ||
Assert.assertTrue(coordinatorOne.isHappenBefore(timestampTwo, timestampThree)); | ||
Assert.assertTrue(coordinatorOne.isHappenBefore(timestampTwo, timestampFour)); | ||
Assert.assertTrue(coordinatorOne.isConcurrent(timestampOne, timestampTwo)); | ||
Assert.assertTrue(coordinatorOne.isConcurrent(timestampThree, timestampFour)); | ||
} | ||
} |