diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index aad5bcb5f0..7becf77c6c 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -24,6 +24,7 @@ hive = "3.1.3" iceberg = "1.9.2" # Ensure to update the iceberg version in regtests to keep regtests up-to-date quarkus = "3.25.4" immutables = "2.11.3" +jmh = "1.37" picocli = "4.7.7" scala212 = "2.12.19" spark35 = "3.5.6" @@ -76,6 +77,9 @@ jandex = { module = "io.smallrye.jandex:jandex", version ="3.4.0" } javax-servlet-api = { module = "javax.servlet:javax.servlet-api", version = "4.0.1" } junit-bom = { module = "org.junit:junit-bom", version = "5.13.4" } keycloak-admin-client = { module = "org.keycloak:keycloak-admin-client", version = "26.0.6" } +jcstress-core = { module = "org.openjdk.jcstress:jcstress-core", version = "0.16" } +jmh-core = { module = "org.openjdk.jmh:jmh-core", version.ref = "jmh" } +jmh-generator-annprocess = { module = "org.openjdk.jmh:jmh-generator-annprocess", version.ref = "jmh" } logback-classic = { module = "ch.qos.logback:logback-classic", version = "1.5.18" } micrometer-bom = { module = "io.micrometer:micrometer-bom", version = "1.15.3" } microprofile-fault-tolerance-api = { module = "org.eclipse.microprofile.fault-tolerance:microprofile-fault-tolerance-api", version = "4.1.2" } @@ -102,6 +106,8 @@ testcontainers-keycloak = { module = "com.github.dasniko:testcontainers-keycloak threeten-extra = { module = "org.threeten:threeten-extra", version = "1.8.0" } [plugins] +jcstress = { id = "io.github.reyerizo.gradle.jcstress", version = "0.8.15" } +jmh = { id = "me.champeau.jmh", version = "0.7.3" } openapi-generator = { id = "org.openapi.generator", version = "7.12.0" } quarkus = { id = "io.quarkus", version.ref = "quarkus" } rat = { id = "org.nosphere.apache.rat", version = "0.8.1" } diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties index 27fdae3556..ce9455948b 100644 --- a/gradle/projects.main.properties +++ b/gradle/projects.main.properties @@ -48,3 +48,9 @@ polaris-extensions-federation-hive=extensions/federation/hive polaris-config-docs-annotations=tools/config-docs/annotations polaris-config-docs-generator=tools/config-docs/generator polaris-config-docs-site=tools/config-docs/site + +# id generation +polaris-idgen-api=persistence/nosql/idgen/api +polaris-idgen-impl=persistence/nosql/idgen/impl +polaris-idgen-mocks=persistence/nosql/idgen/mocks +polaris-idgen-spi=persistence/nosql/idgen/spi diff --git a/persistence/nosql/idgen/README.md b/persistence/nosql/idgen/README.md new file mode 100644 index 0000000000..576a983cf1 --- /dev/null +++ b/persistence/nosql/idgen/README.md @@ -0,0 +1,55 @@ + + +# Unique ID generation framework and monotonic clock + +Provides a framework and implementations for unique ID generation, including a monotonically increasing timestamp/clock +source. + +Provides a +[Snowflake-IDs](https://medium.com/@jitenderkmr/demystifying-snowflake-ids-a-unique-identifier-in-distributed-computing-72796a827c9d) +implementation. + +Consuming production should primarily leverage the `IdGenerator` and `MonotonicClock` interfaces. + +## Snowflake ID source + +The Snowflake ID source is configurable for each backend instance, but cannot be modified for an existing backend +instance to prevent ID conflicts. + +The epoch of these timestamps is 2025-03-01-00:00:00.0 GMT. Timestamps occupy 41 bits at +millisecond precision, which lasts for about 69 years. Node-IDs are 10 bits, which allows 1024 concurrently active +"JVMs running Polaris". 12 bits are used by the sequence number, which then allows each node to generate 4096 IDs per +millisecond. One bit is reserved for future use. + +Node IDs are leased by every "JVM running Polaris" for a period of time. The ID generator implementation guarantees +that no IDs will be generated for a timestamp that exceeds the "lease time". Leases can be extended. The implementation +leverages atomic database operations (CAS) for the lease implementation. + +ID generators must not use timestamps before or after the lease period nor must they re-use an older timestamp. This +requirement is satisfied using a monotonic clock implementation. + +## Code structure + +The code is structured into multiple modules. Consuming code should almost always pull in only the API module. + +* `polaris-idgen-api` provides the necessary Java interfaces and immutable types. +* `polaris-idgen-impl` provides the storage agnostic implementation. +* `polaris-idgen-mocks` provides mocks for testing. +* `polaris-idgen-spi` provides the necessary interfaces to construct ID generators. diff --git a/persistence/nosql/idgen/api/build.gradle.kts b/persistence/nosql/idgen/api/build.gradle.kts new file mode 100644 index 0000000000..91f02365aa --- /dev/null +++ b/persistence/nosql/idgen/api/build.gradle.kts @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + id("org.kordamp.gradle.jandex") + id("polaris-server") +} + +description = "Polaris ID generation API" + +dependencies { + compileOnly(libs.jakarta.annotation.api) + compileOnly(libs.jakarta.validation.api) + compileOnly(libs.jakarta.inject.api) + compileOnly(libs.jakarta.enterprise.cdi.api) + + compileOnly(libs.smallrye.config.core) + compileOnly(platform(libs.quarkus.bom)) + compileOnly("io.quarkus:quarkus-core") + + compileOnly(project(":polaris-immutables")) + annotationProcessor(project(":polaris-immutables", configuration = "processor")) + + implementation(platform(libs.jackson.bom)) + implementation("com.fasterxml.jackson.core:jackson-databind") +} diff --git a/persistence/nosql/idgen/api/src/main/java/org/apache/polaris/ids/api/IdGenerator.java b/persistence/nosql/idgen/api/src/main/java/org/apache/polaris/ids/api/IdGenerator.java new file mode 100644 index 0000000000..2bc7af8ef1 --- /dev/null +++ b/persistence/nosql/idgen/api/src/main/java/org/apache/polaris/ids/api/IdGenerator.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.ids.api; + +/** The primary interface for generating a contention-free ID. */ +public interface IdGenerator { + /** Generate a new, unique ID. */ + long generateId(); + + /** Generate the system ID for a node, solely used for node management. */ + long systemIdForNode(int nodeId); + + default String describeId(long id) { + return Long.toString(id); + } + + IdGenerator NONE = + new IdGenerator() { + @Override + public long generateId() { + throw new UnsupportedOperationException("NONE IdGenerator cannot generate IDs."); + } + + @Override + public long systemIdForNode(int nodeId) { + throw new UnsupportedOperationException("NONE IdGenerator cannot generate IDs."); + } + }; +} diff --git a/persistence/nosql/idgen/api/src/main/java/org/apache/polaris/ids/api/IdGeneratorSpec.java b/persistence/nosql/idgen/api/src/main/java/org/apache/polaris/ids/api/IdGeneratorSpec.java new file mode 100644 index 0000000000..b1d304c993 --- /dev/null +++ b/persistence/nosql/idgen/api/src/main/java/org/apache/polaris/ids/api/IdGeneratorSpec.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.ids.api; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import io.smallrye.config.WithDefault; +import java.util.Map; +import org.apache.polaris.immutables.PolarisImmutable; +import org.immutables.value.Value; + +@PolarisImmutable +@JsonSerialize(as = ImmutableIdGeneratorSpec.class) +@JsonDeserialize(as = ImmutableIdGeneratorSpec.class) +public interface IdGeneratorSpec { + @WithDefault("snowflake") + String type(); + + Map params(); + + @PolarisImmutable + interface BuildableIdGeneratorSpec extends IdGeneratorSpec { + static ImmutableBuildableIdGeneratorSpec.Builder builder() { + return ImmutableBuildableIdGeneratorSpec.builder(); + } + + @Override + Map params(); + + @Override + @Value.Default + default String type() { + return "snowflake"; + } + } +} diff --git a/persistence/nosql/idgen/api/src/main/java/org/apache/polaris/ids/api/MonotonicClock.java b/persistence/nosql/idgen/api/src/main/java/org/apache/polaris/ids/api/MonotonicClock.java new file mode 100644 index 0000000000..2bb6fa1610 --- /dev/null +++ b/persistence/nosql/idgen/api/src/main/java/org/apache/polaris/ids/api/MonotonicClock.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.ids.api; + +import java.time.Instant; + +/** + * Provides a clock providing the current time in milliseconds, microseconds and instant since + * 1970-01-01-00:00:00.000. The returned timestamp values increase monotonically. + * + *

The functions provide nanosecond/microsecond/millisecond precision, but not necessarily the + * same resolution (how frequently the value changes) - no guarantees are made. + * + *

Implementation may adjust to wall clocks advancing faster than the real time. If and + * how exactly depends on the implementation, as long as none of the time values available via this + * interface "goes backwards". + * + *

Implementer notes: {@link System#nanoTime() System.nanoTime()} does not guarantee that the + * values will be monotonically increasing when invocations happen from different + * CPUs/cores/threads. + * + *

A default implementation of {@link MonotonicClock} can be injected as an application scoped + * bean in CDI. + */ +public interface MonotonicClock extends AutoCloseable { + /** + * Current timestamp as microseconds since epoch, can be used as a monotonically increasing wall + * clock. + */ + long currentTimeMicros(); + + /** + * Current timestamp as milliseconds since epoch, can be used as a monotonically increasing wall + * clock. + */ + long currentTimeMillis(); + + /** + * Current instant with nanosecond precision, can be used as a monotonically increasing wall + * clock. + */ + Instant currentInstant(); + + /** Monotonically increasing timestamp with nanosecond precision, not related to wall clock. */ + long nanoTime(); + + void sleepMillis(long millis); + + @Override + void close(); + + void waitUntilTimeMillisAdvanced(); +} diff --git a/persistence/nosql/idgen/api/src/main/java/org/apache/polaris/ids/api/SnowflakeIdGenerator.java b/persistence/nosql/idgen/api/src/main/java/org/apache/polaris/ids/api/SnowflakeIdGenerator.java new file mode 100644 index 0000000000..a0b9b87222 --- /dev/null +++ b/persistence/nosql/idgen/api/src/main/java/org/apache/polaris/ids/api/SnowflakeIdGenerator.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.ids.api; + +import jakarta.annotation.Nonnull; +import java.time.Instant; +import java.util.UUID; + +public interface SnowflakeIdGenerator extends IdGenerator { + /** Offset of the snowflake ID generator since the 1970-01-01T00:00:00Z epoch instant. */ + Instant ID_EPOCH = Instant.parse("2025-03-01T00:00:00Z"); + + /** + * Offset of the snowflake ID generator in milliseconds since the 1970-01-01T00:00:00Z epoch + * instant. + */ + long ID_EPOCH_MILLIS = ID_EPOCH.toEpochMilli(); + + int DEFAULT_NODE_ID_BITS = 10; + int DEFAULT_TIMESTAMP_BITS = 41; + int DEFAULT_SEQUENCE_BITS = 12; + + long constructId(long timestamp, long sequence, long node); + + long timestampFromId(long id); + + long timestampUtcFromId(long id); + + long sequenceFromId(long id); + + long nodeFromId(long id); + + UUID idToTimeUuid(long id); + + String idToString(long id); + + long timeUuidToId(@Nonnull UUID uuid); + + int timestampBits(); + + int sequenceBits(); + + int nodeIdBits(); +} diff --git a/persistence/nosql/idgen/impl/build.gradle.kts b/persistence/nosql/idgen/impl/build.gradle.kts new file mode 100644 index 0000000000..abf245fed9 --- /dev/null +++ b/persistence/nosql/idgen/impl/build.gradle.kts @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + id("org.kordamp.gradle.jandex") + alias(libs.plugins.jmh) + alias(libs.plugins.jcstress) + id("polaris-server") +} + +description = "Polaris ID generation implementation" + +dependencies { + implementation(project(":polaris-idgen-api")) + implementation(project(":polaris-idgen-spi")) + + implementation(libs.guava) + implementation(libs.slf4j.api) + + compileOnly(libs.jakarta.annotation.api) + compileOnly(libs.jakarta.validation.api) + compileOnly(libs.jakarta.inject.api) + compileOnly(libs.jakarta.enterprise.cdi.api) + + compileOnly(libs.smallrye.config.core) + compileOnly(platform(libs.quarkus.bom)) + compileOnly("io.quarkus:quarkus-core") + + compileOnly(project(":polaris-immutables")) + annotationProcessor(project(":polaris-immutables", configuration = "processor")) + + implementation(platform(libs.jackson.bom)) + implementation("com.fasterxml.jackson.core:jackson-databind") + + testFixturesCompileOnly(platform(libs.jackson.bom)) + testFixturesCompileOnly("com.fasterxml.jackson.core:jackson-databind") + + testFixturesCompileOnly(libs.jakarta.inject.api) + testFixturesCompileOnly(libs.jakarta.enterprise.cdi.api) + + testImplementation(project(":polaris-idgen-mocks")) + + jmhImplementation(libs.jmh.core) + jmhAnnotationProcessor(libs.jmh.generator.annprocess) +} + +tasks.named("jcstressJar") { dependsOn("jandex") } + +tasks.named("compileJcstressJava") { dependsOn("jandex") } + +tasks.named("check") { dependsOn("jcstress") } + +jcstress { jcstressDependency = libs.jcstress.core.get().toString() } diff --git a/persistence/nosql/idgen/impl/src/jcstress/java/org/apache/polaris/ids/impl/MonotonicClockStress.java b/persistence/nosql/idgen/impl/src/jcstress/java/org/apache/polaris/ids/impl/MonotonicClockStress.java new file mode 100644 index 0000000000..75b8e745f9 --- /dev/null +++ b/persistence/nosql/idgen/impl/src/jcstress/java/org/apache/polaris/ids/impl/MonotonicClockStress.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.ids.impl; + +import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE; +import static org.openjdk.jcstress.annotations.Expect.FORBIDDEN; +import static org.openjdk.jcstress.annotations.Expect.UNKNOWN; + +import java.time.Instant; +import org.openjdk.jcstress.annotations.Actor; +import org.openjdk.jcstress.annotations.Arbiter; +import org.openjdk.jcstress.annotations.Description; +import org.openjdk.jcstress.annotations.JCStressTest; +import org.openjdk.jcstress.annotations.Outcome; +import org.openjdk.jcstress.annotations.State; +import org.openjdk.jcstress.infra.results.II_Result; + +public class MonotonicClockStress { + public static final MonotonicClockImpl CLOCK = new MonotonicClockImpl().start(); + + @JCStressTest + @Description("Verify that monotonicity is guaranteed across different threads (nanos).") + @Outcome.Outcomes({ + @Outcome(id = "1, 1", expect = ACCEPTABLE, desc = "Both newer"), + @Outcome(id = "1, 0", expect = ACCEPTABLE, desc = "Newer + same time"), + @Outcome(id = "0, 1", expect = ACCEPTABLE, desc = "Same time + newer"), + @Outcome(id = "0, 0", expect = ACCEPTABLE, desc = "Both same time"), + @Outcome(id = "-1, .*", expect = FORBIDDEN, desc = "Clock must not go backwards"), + @Outcome(id = ".*, -1", expect = FORBIDDEN, desc = "Clock must not go backwards"), + @Outcome(expect = UNKNOWN, desc = "Not sure what happened"), + }) + @State() + public static class Nanos { + long ref; + + long v1; + long v2; + + public Nanos() { + ref = CLOCK.nanoTime(); + } + + @Actor + public void actor1() { + v1 = CLOCK.nanoTime(); + } + + @Actor + public void actor2() { + v2 = CLOCK.nanoTime(); + } + + @Arbiter + public void arbiter(II_Result r) { + r.r1 = Long.compare(v1, ref); + r.r2 = Long.compare(v2, ref); + } + } + + @JCStressTest + @Description("Verify that monotonicity is guaranteed across different threads (micros).") + @Outcome.Outcomes({ + @Outcome(id = "1, 1", expect = ACCEPTABLE, desc = "Both newer"), + @Outcome(id = "1, 0", expect = ACCEPTABLE, desc = "Newer + same time"), + @Outcome(id = "0, 1", expect = ACCEPTABLE, desc = "Same time + newer"), + @Outcome(id = "0, 0", expect = ACCEPTABLE, desc = "Both same time"), + @Outcome(id = "-1, .*", expect = FORBIDDEN, desc = "Clock must not go backwards"), + @Outcome(id = ".*, -1", expect = FORBIDDEN, desc = "Clock must not go backwards"), + @Outcome(expect = UNKNOWN, desc = "Not sure what happened"), + }) + @State() + public static class Micros { + long ref; + + long v1; + long v2; + + public Micros() { + ref = CLOCK.currentTimeMicros(); + } + + @Actor + public void actor1() { + v1 = CLOCK.currentTimeMicros(); + } + + @Actor + public void actor2() { + v2 = CLOCK.currentTimeMicros(); + } + + @Arbiter + public void arbiter(II_Result r) { + r.r1 = Long.compare(v1, ref); + r.r2 = Long.compare(v2, ref); + } + } + + @JCStressTest + @Description("Verify that monotonicity is guaranteed across different threads (millis).") + @Outcome.Outcomes({ + @Outcome(id = "1, 1", expect = ACCEPTABLE, desc = "Both newer"), + @Outcome(id = "1, 0", expect = ACCEPTABLE, desc = "Newer + same time"), + @Outcome(id = "0, 1", expect = ACCEPTABLE, desc = "Same time + newer"), + @Outcome(id = "0, 0", expect = ACCEPTABLE, desc = "Both same time"), + @Outcome(id = "-1, .*", expect = FORBIDDEN, desc = "Clock must not go backwards"), + @Outcome(id = ".*, -1", expect = FORBIDDEN, desc = "Clock must not go backwards"), + @Outcome(expect = UNKNOWN, desc = "Not sure what happened"), + }) + @State() + public static class Millis { + long ref; + + long v1; + long v2; + + public Millis() { + ref = CLOCK.currentTimeMillis(); + } + + @Actor + public void actor1() { + v1 = CLOCK.currentTimeMillis(); + } + + @Actor + public void actor2() { + v2 = CLOCK.currentTimeMillis(); + } + + @Arbiter + public void arbiter(II_Result r) { + r.r1 = Long.compare(v1, ref); + r.r2 = Long.compare(v2, ref); + } + } + + @JCStressTest + @Description("Verify that monotonicity is guaranteed across different threads (instants).") + @Outcome.Outcomes({ + @Outcome(id = "1, 1", expect = ACCEPTABLE, desc = "Both newer"), + @Outcome(id = "1, 0", expect = ACCEPTABLE, desc = "Newer + same time"), + @Outcome(id = "0, 1", expect = ACCEPTABLE, desc = "Same time + newer"), + @Outcome(id = "0, 0", expect = ACCEPTABLE, desc = "Both same time"), + @Outcome(id = "-1, .*", expect = FORBIDDEN, desc = "Clock must not go backwards"), + @Outcome(id = ".*, -1", expect = FORBIDDEN, desc = "Clock must not go backwards"), + @Outcome(expect = UNKNOWN, desc = "Not sure what happened"), + }) + @State() + public static class Instants { + Instant ref; + + Instant v1; + Instant v2; + + public Instants() { + ref = CLOCK.currentInstant(); + } + + @Actor + public void actor1() { + v1 = CLOCK.currentInstant(); + } + + @Actor + public void actor2() { + v2 = CLOCK.currentInstant(); + } + + @Arbiter + public void arbiter(II_Result r) { + r.r1 = Integer.compare(v1.compareTo(ref), 0); + r.r2 = Integer.compare(v2.compareTo(ref), 0); + } + } +} diff --git a/persistence/nosql/idgen/impl/src/jcstress/java/org/apache/polaris/ids/impl/SnowflakeIdGeneratorStress.java b/persistence/nosql/idgen/impl/src/jcstress/java/org/apache/polaris/ids/impl/SnowflakeIdGeneratorStress.java new file mode 100644 index 0000000000..8313e38cde --- /dev/null +++ b/persistence/nosql/idgen/impl/src/jcstress/java/org/apache/polaris/ids/impl/SnowflakeIdGeneratorStress.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.ids.impl; + +import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE; +import static org.openjdk.jcstress.annotations.Expect.FORBIDDEN; +import static org.openjdk.jcstress.annotations.Expect.UNKNOWN; + +import org.apache.polaris.ids.api.IdGenerator; +import org.apache.polaris.ids.spi.IdGeneratorSource; +import org.openjdk.jcstress.annotations.Actor; +import org.openjdk.jcstress.annotations.Arbiter; +import org.openjdk.jcstress.annotations.Description; +import org.openjdk.jcstress.annotations.JCStressTest; +import org.openjdk.jcstress.annotations.Outcome; +import org.openjdk.jcstress.annotations.State; +import org.openjdk.jcstress.infra.results.I_Result; +import org.openjdk.jcstress.infra.results.Z_Result; + +public class SnowflakeIdGeneratorStress { + public static final MonotonicClockImpl CLOCK = new MonotonicClockImpl().start(); + public static final IdGenerator IDGEN = + new SnowflakeIdGeneratorImpl( + new IdGeneratorSource() { + @Override + public int nodeId() { + return 42; + } + + @Override + public long currentTimeMillis() { + return CLOCK.currentTimeMillis(); + } + }); + + @JCStressTest + @Description("Verify that generated IDs are unique for the same thread.") + @Outcome.Outcomes({ + @Outcome(id = "1", expect = ACCEPTABLE, desc = "Not equal, greater"), + @Outcome(id = "-1", expect = FORBIDDEN, desc = "Not equal, smaller"), + @Outcome(id = "0", expect = FORBIDDEN, desc = "Equal"), + @Outcome(expect = UNKNOWN, desc = "Not sure what happened"), + }) + @State() + public static class SameThread { + @Actor + public void actor(I_Result r) { + var v1 = IDGEN.generateId(); + var v2 = IDGEN.generateId(); + + r.r1 = Long.compare(v2, v1); + } + } + + @JCStressTest + @Description("Verify that generated IDs are unique for the same thread.") + @Outcome.Outcomes({ + @Outcome(id = "false", expect = ACCEPTABLE, desc = "Not equal"), + @Outcome(id = "true", expect = FORBIDDEN, desc = "Equal"), + @Outcome(expect = UNKNOWN, desc = "Not sure what happened"), + }) + @State() + public static class DifferentThreads { + long v1; + long v2; + + @Actor + public void actor1() { + v1 = IDGEN.generateId(); + } + + @Actor + public void actor2() { + v1 = IDGEN.generateId(); + } + + @Arbiter + public void arbiter(Z_Result r) { + r.r1 = v1 == v2; + } + } +} diff --git a/persistence/nosql/idgen/impl/src/jmh/java/org/apache/polaris/ids/impl/MonotonicClockBench.java b/persistence/nosql/idgen/impl/src/jmh/java/org/apache/polaris/ids/impl/MonotonicClockBench.java new file mode 100644 index 0000000000..5575cfe825 --- /dev/null +++ b/persistence/nosql/idgen/impl/src/jmh/java/org/apache/polaris/ids/impl/MonotonicClockBench.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.ids.impl; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +import java.time.Instant; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +@Warmup(iterations = 3, time = 1000, timeUnit = MILLISECONDS) +@Measurement(iterations = 6, time = 1000, timeUnit = MILLISECONDS) +@Fork(1) +@Threads(4) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(NANOSECONDS) +public class MonotonicClockBench { + @State(Scope.Benchmark) + public static class BenchmarkParam { + MonotonicClockImpl monotonicClock; + MonotonicClockImpl monotonicClockIdle; + + @Setup + public void init() { + monotonicClock = new MonotonicClockImpl().start(); + monotonicClockIdle = new MonotonicClockImpl(); + } + } + + @Threads(1) + @Benchmark + public void tick(BenchmarkParam param) { + param.monotonicClockIdle.tick(); + } + + @Benchmark + public long nanoTime(BenchmarkParam param) { + return param.monotonicClock.nanoTime(); + } + + @Benchmark + public long currentTimeMicros(BenchmarkParam param) { + return param.monotonicClock.currentTimeMicros(); + } + + @Benchmark + public long currentTimeMillis(BenchmarkParam param) { + return param.monotonicClock.currentTimeMillis(); + } + + @Benchmark + public Instant currentInstant(BenchmarkParam param) { + return param.monotonicClock.currentInstant(); + } + + @Benchmark + public long systemCurrentTimeMillis(BenchmarkParam param) { + return param.monotonicClock.systemCurrentTimeMillis(); + } + + @Benchmark + public long systemNanoTime(BenchmarkParam param) { + return param.monotonicClock.systemNanoTime(); + } +} diff --git a/persistence/nosql/idgen/impl/src/jmh/java/org/apache/polaris/ids/impl/SnowflakeIdGeneratorBench.java b/persistence/nosql/idgen/impl/src/jmh/java/org/apache/polaris/ids/impl/SnowflakeIdGeneratorBench.java new file mode 100644 index 0000000000..de0f790896 --- /dev/null +++ b/persistence/nosql/idgen/impl/src/jmh/java/org/apache/polaris/ids/impl/SnowflakeIdGeneratorBench.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.ids.impl; + +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static org.apache.polaris.ids.api.SnowflakeIdGenerator.DEFAULT_NODE_ID_BITS; +import static org.apache.polaris.ids.api.SnowflakeIdGenerator.DEFAULT_SEQUENCE_BITS; +import static org.apache.polaris.ids.api.SnowflakeIdGenerator.DEFAULT_TIMESTAMP_BITS; + +import java.util.concurrent.atomic.AtomicLong; +import org.apache.polaris.ids.api.MonotonicClock; +import org.apache.polaris.ids.api.SnowflakeIdGenerator; +import org.apache.polaris.ids.spi.IdGeneratorSource; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +@Warmup(iterations = 5, time = 1000, timeUnit = MILLISECONDS) +@Measurement(iterations = 5, time = 1000, timeUnit = MILLISECONDS) +@Fork(1) +@Threads(4) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(NANOSECONDS) +public class SnowflakeIdGeneratorBench { + @State(Scope.Benchmark) + public static class BenchmarkParam { + SnowflakeIdGenerator idGeneratorMonotonicClock; + SnowflakeIdGenerator idGeneratorMonotonicClockHugeSequence; + SnowflakeIdGenerator idGeneratorFakeClock; + MonotonicClock monotonicClock; + + @Setup + public void init() { + monotonicClock = MonotonicClockImpl.newDefaultInstance(); + + var idGeneratorSource = + new IdGeneratorSource() { + @Override + public int nodeId() { + return 1; + } + + @Override + public long currentTimeMillis() { + return monotonicClock.currentTimeMillis(); + } + }; + idGeneratorMonotonicClock = + new SnowflakeIdGeneratorFactory() + .buildIdGenerator( + DEFAULT_TIMESTAMP_BITS, + DEFAULT_SEQUENCE_BITS, + DEFAULT_NODE_ID_BITS, + monotonicClock.currentTimeMillis(), + idGeneratorSource); + + idGeneratorMonotonicClockHugeSequence = + new SnowflakeIdGeneratorFactory() + .buildIdGenerator( + DEFAULT_TIMESTAMP_BITS - 15, + DEFAULT_SEQUENCE_BITS + 23, + DEFAULT_NODE_ID_BITS - 8, + monotonicClock.currentTimeMillis(), + idGeneratorSource); + + var off = System.currentTimeMillis(); + var fakeClock = new AtomicLong(off); + idGeneratorFakeClock = + new SnowflakeIdGeneratorImpl( + DEFAULT_TIMESTAMP_BITS, + DEFAULT_SEQUENCE_BITS, + DEFAULT_NODE_ID_BITS, + off, + new IdGeneratorSource() { + @Override + public int nodeId() { + return 1; + } + + @Override + public long currentTimeMillis() { + return fakeClock.get(); + } + }) { + @Override + void spinWaitSequence() { + fakeClock.incrementAndGet(); + } + }; + } + } + + /** + * WARNING: This {@code generateIdMonotonicSource} benchmark relies indirectly on a real + * system clock via {@link MonotonicClockImpl} and is therefore not only slower because of hitting + * the OS clock but mostly because it spin-waits due to too many IDs are generated per + * millisecond. In other words: the times yielded by JMH MUST NOT be considered as + * runtimes in production, because it practically never happens that more than 4096 IDs are + * generated per millisecond. + */ + @Benchmark + public long generateIdMonotonicSourceSpinning(BenchmarkParam param) { + return param.idGeneratorMonotonicClock.generateId(); + } + + /** + * Snowflake ID generation against a generator configured with an extremely high number of + * sequence-bits for the sole purpose of benchmarking ID generation with an extremely low chance + * of spinning. + */ + @Benchmark + public long generateIdMonotonicSourceHugeSequence(BenchmarkParam param) { + return param.idGeneratorMonotonicClockHugeSequence.generateId(); + } + + /** */ + @Benchmark + @Measurement(iterations = 50, time = 900, timeUnit = MICROSECONDS) + @Threads(1) + public long generateIdMonotonicSourceNotSpinning(BenchmarkParam param) { + return param.idGeneratorMonotonicClock.generateId(); + } + + /** + * Artificial benchmark to just measure the ID generator without spinning (waiting for the "next" + * millisecond). + */ + @Benchmark + public long generateIdFakeClockSource(BenchmarkParam param) { + return param.idGeneratorFakeClock.generateId(); + } +} diff --git a/persistence/nosql/idgen/impl/src/main/java/org/apache/polaris/ids/impl/MonotonicClockImpl.java b/persistence/nosql/idgen/impl/src/main/java/org/apache/polaris/ids/impl/MonotonicClockImpl.java new file mode 100644 index 0000000000..786dc095e2 --- /dev/null +++ b/persistence/nosql/idgen/impl/src/main/java/org/apache/polaris/ids/impl/MonotonicClockImpl.java @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.ids.impl; + +import static com.google.common.base.Preconditions.checkState; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; + +import com.google.common.annotations.VisibleForTesting; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import org.apache.polaris.ids.api.MonotonicClock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Monotonic clock implementation that leverages {@link System#nanoTime()} as the primary + * monotonically increasing time source, provided via {@link #nanoTime()}. {@link + * System#currentTimeMillis()} is used to provide a monotonically increasing wall clock provided via + * {@link #currentInstant()}, {@link #currentTimeMicros()} and {@link #currentTimeMillis()}. + * + *

The implementation starts a single "tick-thread" polling the wall clock to calculate the + * adjustment that is necessary to provide the values for {@code currentTime*()}. + * + *

Serving the current instant or "current time micros" however is a bit more complex, as the + * wall-clock source only has millisecond precision, but the instant has nanosecond precision. This + * means that the value returned for "current instant" needs to be created from the "nanosecond + * time" and involving an "adjustment" value. That "adjustment" is also updated by the tick-thread + * and represents the difference of the current nano-time and system wall clock, considering the + * fact that the system wall clock can go backwards or forwards or not being updated every + * millisecond. + * + *

This implementation expects that the wall clock in nanoseconds since epoch can be represented + * by the values in the range {@code 0 .. Long.MAX_VALUE}. This implementation must be + * adapted approaching the year 2262 (approx 292 years fit into this range). + * + *

Regarding "short-time" Thread.sleep() be aware of JDK-8306463 and JDK-8305092. + * + *

Even with very minimal sleep durations, the actual sleep time depends on the OS and in + * particular its scheduler. Sleep times have been measured to vary between some microseconds up to + * 2ms. + */ +@ApplicationScoped +@VisibleForTesting +@SuppressWarnings("FieldCanBeStatic") +public class MonotonicClockImpl implements MonotonicClock { + private static final Logger LOGGER = LoggerFactory.getLogger(MonotonicClockImpl.class); + + private volatile boolean stopTicker; + private volatile CountDownLatch tickerThreadLatch; + + // TODO should the implementation only advance the wall clock gradually? + // TODO protect against accidental huge wall clock advances? + // TODO should the implementation maybe never adjust to an advanced wall-clock? (i.e. faster wall + // clock than real time clock) + + // Best-effort to have the volatile fields not in the same cache line as the object header + + @SuppressWarnings("unused") + private final long _pad_1_0 = 0L; + + @SuppressWarnings("unused") + private final long _pad_1_1 = 0L; + + @SuppressWarnings("unused") + private final long _pad_1_2 = 0L; + + @SuppressWarnings("unused") + private final long _pad_1_3 = 0L; + + @SuppressWarnings("unused") + private final long _pad_1_4 = 0L; + + @SuppressWarnings("unused") + private final long _pad_1_5 = 0L; + + @SuppressWarnings("unused") + private final long _pad_1_6 = 0L; + + @SuppressWarnings("unused") + private final long _pad_1_7 = 0L; + + private volatile long adjustToWallClockAsNanos; + + @SuppressWarnings("unused") + private final long _pad_2_0 = 0L; + + @SuppressWarnings("unused") + private final long _pad_2_1 = 0L; + + @SuppressWarnings("unused") + private final long _pad_2_2 = 0L; + + @SuppressWarnings("unused") + private final long _pad_2_3 = 0L; + + @SuppressWarnings("unused") + private final long _pad_2_4 = 0L; + + @SuppressWarnings("unused") + private final long _pad_2_5 = 0L; + + @SuppressWarnings("unused") + private final long _pad_2_6 = 0L; + + @SuppressWarnings("unused") + private final long _pad_2_7 = 0L; + + private volatile long previousSystemNanoTime; + private static final AtomicLongFieldUpdater + PREVIOUS_SYSTEM_NANO_TIME_UPDATER = + AtomicLongFieldUpdater.newUpdater(MonotonicClockImpl.class, "previousSystemNanoTime"); + + @SuppressWarnings("resource") + @VisibleForTesting + public static MonotonicClock newDefaultInstance() { + return new MonotonicClockImpl().start(); + } + + // for CDI + + /** Default "production" constructor. */ + MonotonicClockImpl() { + setup(); + } + + protected void setup() { + var nowNanos = systemNanoTime(); + PREVIOUS_SYSTEM_NANO_TIME_UPDATER.set(this, nowNanos); + + var nowWallClockAsMillis = systemCurrentTimeMillis(); + + this.adjustToWallClockAsNanos = MILLISECONDS.toNanos(nowWallClockAsMillis) - nowNanos; + } + + /** Constructor for {@code MutableMonotonicClock}. */ + @SuppressWarnings("unused") + protected MonotonicClockImpl(boolean dummy) {} + + long currentTimeNanos() { + return currentTimeNanos(monotonicSystemNanoTime()); + } + + private long currentTimeNanos(long nanos) { + nanos += this.adjustToWallClockAsNanos; + return nanos; + } + + /** Called regularly to adjust to wall-clock drift, if the wall-clock adjust into the future. */ + @VisibleForTesting + protected void tick() { + var nowNanos = monotonicSystemNanoTime(); + var nowWallClockAsMillis = systemCurrentTimeMillis(); + + var expectedWallClockMillis = NANOSECONDS.toMillis(currentTimeNanos(nowNanos)); + var advancedInMillis = nowWallClockAsMillis - expectedWallClockMillis; + + // Only adjust if wall clock did not go backwards + if (advancedInMillis > 0) { + var adjustAsNanos = this.adjustToWallClockAsNanos; + this.adjustToWallClockAsNanos = adjustAsNanos + MILLISECONDS.toNanos(advancedInMillis); + + afterAdjust(); + + // log a if the system wall clock adjustment is quite a sudden bump for a production system + if (advancedInMillis > 200) { + var l = + advancedInMillis > 30_000 + ? LOGGER.atError() + : (advancedInMillis > 2_000 ? LOGGER.atWarn() : LOGGER.atInfo()); + l.log("System wall clock adjustment advanced by {}", Duration.ofMillis(advancedInMillis)); + } + } + } + + @SuppressWarnings("BusyWait") + private void ticker() { + try { + tickerThreadLatch = new CountDownLatch(1); + while (!stopTicker) { + try { + Thread.sleep(50); + } catch (InterruptedException e) { + // ignore + } + tick(); + } + } finally { + tickerThreadLatch.countDown(); + } + } + + @PostConstruct + void startForCDI() { + checkState(!stopTicker, "Already started"); + + var t = new Thread(this::ticker, "Monotonic Clock Thread"); + t.setDaemon(true); + t.start(); + } + + protected MonotonicClockImpl start() { + startForCDI(); + return this; + } + + @SuppressWarnings("ResultOfMethodCallIgnored") + @Override + @PreDestroy + public void close() { + stopTicker = true; + var t = tickerThreadLatch; + if (t != null) { + try { + t.await(1, MINUTES); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + tickerThreadLatch = null; + } + } + } + + @Override + public long currentTimeMicros() { + return NANOSECONDS.toMicros(currentTimeNanos()); + } + + @Override + public long currentTimeMillis() { + return NANOSECONDS.toMillis(currentTimeNanos()); + } + + @Override + public Instant currentInstant() { + var adjustedNanos = currentTimeNanos(); + + var seconds = NANOSECONDS.toSeconds(adjustedNanos); + var nanoPart = adjustedNanos % SECONDS.toNanos(1); + + return Instant.ofEpochSecond(seconds, nanoPart); + } + + @Override + public long nanoTime() { + return monotonicSystemNanoTime(); + } + + @Override + public void sleepMillis(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + @Override + public void waitUntilTimeMillisAdvanced() { + var start = currentTimeMillis(); + var now = 0L; + do { + try { + // The minimum interval is (at least up to Java 23 on Linux) is the time it takes the OS + // scheduler to switch tasks. That time is way higher than one nanosecond. + Thread.sleep(0, 1); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + now = currentTimeMillis(); + } while (start == now); + } + + // Overridden by tests + @VisibleForTesting + protected void afterAdjust() {} + + // Overridden by tests + @VisibleForTesting + protected long systemCurrentTimeMillis() { + return System.currentTimeMillis(); + } + + // Overridden by tests + @VisibleForTesting + protected long systemNanoTime() { + return System.nanoTime(); + } + + /** + * {@link System#nanoTime() System.nanoTime()} does not guarantee that the values will be + * monotonically increasing when invocations happen from different CPUs/cores. + * + *

This function guarantees that the returned value is always equal to or greater than the last + * returned value. + * + *

Adding a "simple unit test" for this function is extremely tricky, because every + * synchronization added to a test "breaks" real concurrency, which is however what needs to be + * tested. + */ + private long monotonicSystemNanoTime() { + while (true) { + var nanos = systemNanoTime(); + var last = PREVIOUS_SYSTEM_NANO_TIME_UPDATER.get(this); + var diff = nanos - last; + // Attention! 'diff' can be negative! + if (diff > 0L) { + if (PREVIOUS_SYSTEM_NANO_TIME_UPDATER.compareAndSet(this, last, nanos)) { + return nanos; + } + } else if (diff == 0L) { + return nanos; + } + Thread.onSpinWait(); + } + } +} diff --git a/persistence/nosql/idgen/impl/src/main/java/org/apache/polaris/ids/impl/SnowflakeIdGeneratorFactory.java b/persistence/nosql/idgen/impl/src/main/java/org/apache/polaris/ids/impl/SnowflakeIdGeneratorFactory.java new file mode 100644 index 0000000000..aece3a184f --- /dev/null +++ b/persistence/nosql/idgen/impl/src/main/java/org/apache/polaris/ids/impl/SnowflakeIdGeneratorFactory.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.ids.impl; + +import static org.apache.polaris.ids.impl.SnowflakeIdGeneratorImpl.validateArguments; + +import java.time.Instant; +import java.util.Map; +import org.apache.polaris.ids.api.SnowflakeIdGenerator; +import org.apache.polaris.ids.spi.IdGeneratorFactory; +import org.apache.polaris.ids.spi.IdGeneratorSource; + +public class SnowflakeIdGeneratorFactory implements IdGeneratorFactory { + @Override + public void validateParameters(Map params, IdGeneratorSource idGeneratorSource) { + int timestampBits = + Integer.parseInt( + params.getOrDefault( + "timestamp-bits", "" + SnowflakeIdGenerator.DEFAULT_TIMESTAMP_BITS)); + int nodeIdBits = + Integer.parseInt( + params.getOrDefault("node-id-bits", "" + SnowflakeIdGenerator.DEFAULT_NODE_ID_BITS)); + int sequenceBits = + Integer.parseInt( + params.getOrDefault("sequence-bits", "" + SnowflakeIdGenerator.DEFAULT_SEQUENCE_BITS)); + var idEpochMillis = SnowflakeIdGenerator.ID_EPOCH_MILLIS; + var idEpochMillisFromParams = params.get("offset"); + if (idEpochMillisFromParams != null) { + idEpochMillis = Instant.parse(idEpochMillisFromParams).toEpochMilli(); + } + + validateArguments(timestampBits, sequenceBits, nodeIdBits, idEpochMillis, idGeneratorSource); + } + + @Override + public SnowflakeIdGenerator buildSystemIdGenerator(Map params) { + return buildIdGenerator( + params, + new IdGeneratorSource() { + @Override + public int nodeId() { + return 0; + } + + @Override + public long currentTimeMillis() { + return SnowflakeIdGenerator.ID_EPOCH_MILLIS; + } + }); + } + + @Override + public SnowflakeIdGenerator buildIdGenerator( + Map params, IdGeneratorSource idGeneratorSource) { + int timestampBits = + Integer.parseInt( + params.getOrDefault( + "timestamp-bits", "" + SnowflakeIdGenerator.DEFAULT_TIMESTAMP_BITS)); + int nodeIdBits = + Integer.parseInt( + params.getOrDefault("node-id-bits", "" + SnowflakeIdGenerator.DEFAULT_NODE_ID_BITS)); + int sequenceBits = + Integer.parseInt( + params.getOrDefault("sequence-bits", "" + SnowflakeIdGenerator.DEFAULT_SEQUENCE_BITS)); + // ATCFIX - This name is incorrect. + var idEpochMillis = SnowflakeIdGenerator.ID_EPOCH_MILLIS; + var offset = params.get("offset"); + if (offset != null) { + idEpochMillis = Instant.parse(offset).toEpochMilli(); + } + + return buildIdGenerator( + timestampBits, sequenceBits, nodeIdBits, idEpochMillis, idGeneratorSource); + } + + public SnowflakeIdGenerator buildIdGenerator( + int timestampBits, + int sequenceBits, + int nodeIdBits, + long offsetMillis, + IdGeneratorSource idGeneratorSource) { + return new SnowflakeIdGeneratorImpl( + timestampBits, sequenceBits, nodeIdBits, offsetMillis, idGeneratorSource); + } + + @Override + public String name() { + return "snowflake"; + } +} diff --git a/persistence/nosql/idgen/impl/src/main/java/org/apache/polaris/ids/impl/SnowflakeIdGeneratorImpl.java b/persistence/nosql/idgen/impl/src/main/java/org/apache/polaris/ids/impl/SnowflakeIdGeneratorImpl.java new file mode 100644 index 0000000000..a2727a30d1 --- /dev/null +++ b/persistence/nosql/idgen/impl/src/main/java/org/apache/polaris/ids/impl/SnowflakeIdGeneratorImpl.java @@ -0,0 +1,408 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.ids.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static java.lang.String.format; + +import com.google.common.annotations.VisibleForTesting; +import jakarta.annotation.Nonnull; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import org.apache.polaris.ids.api.MonotonicClock; +import org.apache.polaris.ids.api.SnowflakeIdGenerator; +import org.apache.polaris.ids.spi.IdGeneratorSource; + +/** + * Implementation of a local, per-node generator for so-called "snowflake IDs", which are unique + * integer IDs in a distributed environment. + * + *

A monotonically increasing clock is strictly required. Invocations of {@link + * #generateId()} fail hard, if the clock walks backwards, which means it returns a lower value than + * before. It is recommended to use an implementation of {@link MonotonicClock} as the clock source. + * + *

The implementation is thread-safe. + * + *

Reference: Article + * on medium.com, Twitter + * GitHub repository (archived) + */ +class SnowflakeIdGeneratorImpl implements SnowflakeIdGenerator { + + // TODO add a specialized implementation using hard-coded values for the standardized parameters + + private static final AtomicLongFieldUpdater LAST_ID_UPDATER = + AtomicLongFieldUpdater.newUpdater(SnowflakeIdGeneratorImpl.class, "lastId"); + + private final IdGeneratorSource idGeneratorSource; + + // Used in hot generateId() + private volatile long lastId; + private final long idEpoch; + private final long timestampMax; + private final int timestampShift; + private final int sequenceBits; + private final long sequenceMask; + private final long nodeMask; + + SnowflakeIdGeneratorImpl(IdGeneratorSource idGeneratorSource) { + this( + DEFAULT_TIMESTAMP_BITS, + DEFAULT_SEQUENCE_BITS, + DEFAULT_NODE_ID_BITS, + ID_EPOCH_MILLIS, + idGeneratorSource); + } + + SnowflakeIdGeneratorImpl( + int timestampBits, + int sequenceBits, + int nodeBits, + long idEpoch, + IdGeneratorSource idGeneratorSource) { + validateArguments(timestampBits, sequenceBits, nodeBits, idEpoch, idGeneratorSource); + this.timestampShift = sequenceBits + nodeBits; + this.timestampMax = 1L << timestampBits; + this.nodeMask = (1L << nodeBits) - 1; + this.sequenceBits = sequenceBits; + this.sequenceMask = (1L << sequenceBits) - 1; + this.idEpoch = idEpoch; + this.idGeneratorSource = idGeneratorSource; + } + + static void validateArguments( + int timestampBits, + int sequenceBits, + int nodeBits, + long idEpochMillis, + IdGeneratorSource idGeneratorSource) { + var nowMillis = idGeneratorSource != null ? idGeneratorSource.currentTimeMillis() : -1; + var now = Instant.ofEpochMilli(nowMillis); + var timestampMax = 1L << timestampBits; + checkArgs( + () -> checkArgument(idGeneratorSource != null, "IdGeneratorSource must not be null"), + () -> + checkArgument( + nowMillis >= idEpochMillis, + "Clock returns a timestamp %s less than the configured epoch %s", + now, + Instant.ofEpochMilli(idEpochMillis)), + () -> + checkArgument( + nowMillis - idEpochMillis < timestampMax, + "Clock already returns a timestamp %s greater of after %s", + now, + Instant.ofEpochMilli(timestampMax)), + () -> + checkArgument( + nodeBits >= 2 + && sequenceBits >= 5 + && timestampBits >= 5 // this is REALLY low ! + && nodeBits < 64 + && sequenceBits < 64 + && timestampBits < 64, + "value of nodeBits %s or sequenceBits %s or timestampBits %s is too low or too high", + nodeBits, + sequenceBits, + timestampBits), + () -> + checkArgument( + timestampBits + nodeBits + sequenceBits == 63, + "Sum of timestampBits + nodeBits + sequenceBits must be == 63"), + () -> { + if (idGeneratorSource != null) { + var nodeId = idGeneratorSource.nodeId(); + var nodeMax = 1L << nodeBits; + checkArgument( + nodeId >= 0 && nodeId < nodeMax, "nodeId %s out of range [0..%s[", nodeId, nodeMax); + } + }); + } + + static void checkArgs(Runnable... checks) { + var violations = new ArrayList(); + for (Runnable check : checks) { + try { + check.run(); + } catch (IllegalArgumentException iae) { + violations.add(iae.getMessage()); + } + } + if (!violations.isEmpty()) { + throw new IllegalArgumentException(String.join(", ", violations)); + } + } + + @Override + public long systemIdForNode(int nodeId) { + return constructIdUnsafe(timestampMax - 1, 0, nodeId); + } + + private long constructIdUnsafe(long timestamp, long sequence, long nodeId) { + return (timestamp << timestampShift) | (nodeId << sequenceBits) | sequence; + } + + @Override + public long constructId(long timestamp, long sequence, long nodeId) { + checkArgument( + (timestamp & (timestampMax - 1)) != timestampMax - 1, + "timestamp argument %s out of range", + timestamp); + checkArgument( + (sequence & sequenceMask) == sequence, "sequence argument %s out of range", sequence); + checkArgument((nodeId & nodeMask) == nodeId, "nodeId argument %s out of range", nodeId); + return constructIdUnsafe(timestamp, sequence, nodeId); + } + + @Override + public long generateId() { + var nodeId = idGeneratorSource.nodeId(); + checkState(nodeId >= 0, "Cannot generate a new ID, shutting down?"); + var nodeIdPattern = ((long) nodeId) << sequenceBits; + + var needTimestamp = true; + var timestamp = 0L; + + while (true) { + var last = LAST_ID_UPDATER.get(this); + var lastTimestamp = timestampFromId(last); + + if (needTimestamp || timestamp < lastTimestamp) { + // MUST query the clock AFTER fetching 'lastId', otherwise a concurrent thread might update + // 'lastId' with a newer clock value and the monotonic-clock-source check would fail. + timestamp = idGeneratorSource.currentTimeMillis() - idEpoch; + checkState( + timestamp < timestampMax, + "Cannot generate any more IDs as the lifetime of the generator has expired"); + if (timestamp < lastTimestamp) { + throw new IllegalStateException( + "Clock walked backwards from " + + lastTimestamp + + " to " + + timestamp + + ", provide a monotonically increasing clock source"); + } + needTimestamp = false; + } + + long sequence; + if (lastTimestamp == timestamp) { + sequence = sequenceFromId(last); + if (sequence == sequenceMask) { + // last generated sequence for the current millisecond yielded the maximum value, + // spin-wait until the next millisecond + spinWaitSequence(); + // Force re-fetching the timestamp + needTimestamp = true; + continue; + } + sequence++; + } else { + sequence = 0L; + } + + holdForTest(); + + var id = (timestamp << timestampShift) | nodeIdPattern | sequence; + + if (LAST_ID_UPDATER.compareAndSet(this, last, id)) { + return id; + } + + spinWaitRace(); + // Do not re-fetch the timestamp from the clock source (a bit faster) + } + } + + @VisibleForTesting + void holdForTest() {} + + @VisibleForTesting + void spinWaitSequence() { + try { + // Sleep for 0.5ms - no Thread.yield() or Thread.onSpinWait(), because those cause too much + // CPU load + Thread.sleep(0, 500_000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + @VisibleForTesting + void spinWaitRace() { + Thread.onSpinWait(); + } + + @Override + public long timestampFromId(long id) { + return id >>> timestampShift; + } + + @Override + public long timestampUtcFromId(long id) { + return timestampFromId(id) + idEpoch; + } + + @Override + public long sequenceFromId(long id) { + return id & sequenceMask; + } + + @Override + public long nodeFromId(long id) { + return (id >>> sequenceBits) & nodeMask; + } + + @Override + public UUID idToTimeUuid(long id) { + var timestamp = timestampFromId(id); + var sequence = sequenceFromId(id); + var node = nodeFromId(id); + + return new UUID(timeUuidMsb(timestamp), timeUuidLsb(sequence, node)); + } + + @Override + public long timeUuidToId(@Nonnull UUID uuid) { + checkArgument( + uuid.variant() == 2 && uuid.version() == 1, "Must be a version 1 / variant 2 UUID"); + var ts = uuid.timestamp() - idEpoch; + var seq = uuid.clockSequence(); + var node = uuid.node(); + checkArgument( + ts > 0 + && ts <= timestampMax + && seq >= 0 + && seq <= sequenceMask + && node >= 0 + && node <= nodeMask, + "TimeUUID contains values that cannot be condensed into a snowflake-ID"); + return constructId(ts, seq, node); + } + + @Override + public String describeId(long id) { + var ts = timestampFromId(id); + var seq = sequenceFromId(id); + var node = nodeFromId(id); + var tsUnixEpoch = ts + idEpoch; + var instant = Instant.ofEpochMilli(tsUnixEpoch); + var zone = ZoneOffset.systemDefault(); + var local = LocalDateTime.ofInstant(instant, zone); + return format( + """ + Snowflake-ID %d components + timestamp : %d + node : %d%s + sequence : %d + timestamp/Unix : %d (= timestamp + epoch offset) + timestamp/instant : %s + timestamp/local : %s %s + generator offset : %d / %s + """, + id, + ts, + node, + (ts == 0L && seq == 0L) ? " (system ID for this node)" : "", + seq, + tsUnixEpoch, + instant, + local, + zone, + idEpoch, + Instant.ofEpochMilli(idEpoch)); + } + + @Override + public int timestampBits() { + return Long.numberOfTrailingZeros(timestampMax); + } + + @Override + public int sequenceBits() { + return sequenceBits; + } + + @Override + public int nodeIdBits() { + return 64 - Long.numberOfLeadingZeros(nodeMask); + } + + @Override + public String idToString(long id) { + var ts = timestampFromId(id); + return Instant.ofEpochMilli(ts + idEpoch).toString() + + " (" + + ts + + "), sequence " + + sequenceFromId(id) + + ", node " + + nodeFromId(id); + } + + @VisibleForTesting + static long timeUuidLsb(long sequence, long node) { + // LSB: + // 0xC000000000000000 variant + // 0x3FFF000000000000 clock_seq + // 0x0000FFFFFFFFFFFF node + + return + // variant + 0x8000000000000000L + // clock_seq + | ((sequence << 48) & 0x3FFF000000000000L) + // node + | (node & 0x0000FFFFFFFFFFFFL); + } + + @VisibleForTesting + private long timeUuidMsb(long timestamp) { + return timeUuidMsbReal(timestamp + idEpoch); + } + + @VisibleForTesting + static long timeUuidMsbReal(long timestamp) { + // MSB: + // 0xFFFFFFFF00000000 time_low + // 0x00000000FFFF0000 time_mid + // 0x000000000000F000 version + // 0x0000000000000FFF time_hi + + return + // time_low + (timestamp << 32 & 0xFFFFFFFF00000000L) + | + // time_mid + ((timestamp >>> (32 - 16) & 0x00000000FFFF0000L)) + | + // version + 0x0000000000001000L + | + // time_hi + ((timestamp >>> 48) & 0x0000000000000FFFL); + } +} diff --git a/persistence/nosql/idgen/impl/src/main/resources/META-INF/beans.xml b/persistence/nosql/idgen/impl/src/main/resources/META-INF/beans.xml new file mode 100644 index 0000000000..a297f1aa53 --- /dev/null +++ b/persistence/nosql/idgen/impl/src/main/resources/META-INF/beans.xml @@ -0,0 +1,24 @@ + + + + + \ No newline at end of file diff --git a/persistence/nosql/idgen/impl/src/main/resources/META-INF/services/org.apache.polaris.ids.spi.IdGeneratorFactory b/persistence/nosql/idgen/impl/src/main/resources/META-INF/services/org.apache.polaris.ids.spi.IdGeneratorFactory new file mode 100644 index 0000000000..7cfb95353d --- /dev/null +++ b/persistence/nosql/idgen/impl/src/main/resources/META-INF/services/org.apache.polaris.ids.spi.IdGeneratorFactory @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +org.apache.polaris.ids.impl.SnowflakeIdGeneratorFactory diff --git a/persistence/nosql/idgen/impl/src/test/java/org/apache/polaris/ids/impl/TestMonotonicClockImpl.java b/persistence/nosql/idgen/impl/src/test/java/org/apache/polaris/ids/impl/TestMonotonicClockImpl.java new file mode 100644 index 0000000000..ef043d76ef --- /dev/null +++ b/persistence/nosql/idgen/impl/src/test/java/org/apache/polaris/ids/impl/TestMonotonicClockImpl.java @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.ids.impl; + +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; + +import java.time.Instant; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.polaris.ids.api.MonotonicClock; +import org.apache.polaris.ids.mocks.MutableMonotonicClock; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +@ExtendWith(SoftAssertionsExtension.class) +@Timeout(value = 5, unit = MINUTES) +public class TestMonotonicClockImpl { + @InjectSoftAssertions protected SoftAssertions soft; + + @Test + public void simple() { + var systemNow = System.currentTimeMillis(); + + try (var monotonicClock = + new MutableMonotonicClock(systemNow, MILLISECONDS.toNanos(systemNow))) { + monotonicClock.start(); + soft.assertThat(monotonicClock.currentTimeMillis()).isEqualTo(systemNow); + } + } + + @Test + public void realClockMustNotGoBackwards() { + try (var monotonicClock = new MonotonicClockImpl()) { + monotonicClock.start(); + + var lastMicros = monotonicClock.currentTimeMicros(); + var lastMillis = monotonicClock.currentTimeMillis(); + var lastInstant = monotonicClock.currentInstant(); + var lastNanos = monotonicClock.systemNanoTime(); + + // Run for 5 seconds so there is a real chance to catch a couple of "second wraps" and + // wall-clock changes. + var endAfter = lastNanos + SECONDS.toNanos(5); + + while (true) { + + var nanos = monotonicClock.systemNanoTime(); + soft.assertThat(nanos).isGreaterThanOrEqualTo(lastNanos); + + var micros = monotonicClock.currentTimeMicros(); + soft.assertThat(micros).isGreaterThanOrEqualTo(lastMicros); + + var millis = monotonicClock.currentTimeMillis(); + soft.assertThat(millis).isGreaterThanOrEqualTo(lastMillis); + + var instant = monotonicClock.currentInstant(); + soft.assertThat(instant).isAfterOrEqualTo(lastInstant); + + soft.assertAll(); + + lastMicros = micros; + lastMillis = millis; + lastInstant = instant; + lastNanos = nanos; + + if (nanos > endAfter) { + break; + } + } + } + } + + @ParameterizedTest + @ValueSource( + longs = { + 0L, + 500L, + 1_000L, + 1_000_000L, + 1_000_000_000L, + // wrap around to negative + Long.MAX_VALUE, + // wrap around to negative after 50ms + Long.MAX_VALUE - 50_000_000L, + // wrap around to negative after 150ms + Long.MAX_VALUE, + // "negative for" 50ms + -50_000_000L, + // "negative for" 150ms + -150_000_000L, + // "negative for" 500ms + -500_000_000L, + // "negative for" 500s + -500_00_000_000L, + // always negative + Long.MIN_VALUE + 100_000_000_000L, + Long.MIN_VALUE + 100_000_000L, + Long.MIN_VALUE + 100_000L, + Long.MIN_VALUE + 1000L, + Long.MIN_VALUE + 500L, + Long.MIN_VALUE + 1, + Long.MIN_VALUE + }) + public void nanoSourceNegativePositive(long nanoOffset) { + var nano = nanoOffset; + var systemWall = 0L; + var realTimeWall = 0L; + var inst = Instant.EPOCH; + + // MonotonicClockImpl not started, no need to close() + @SuppressWarnings("resource") + var monotonicClock = new MutableMonotonicClock(systemWall, nano); + + soft.assertThat(monotonicClock) + .extracting( + MonotonicClock::currentTimeMillis, + MonotonicClock::currentTimeMicros, + MonotonicClock::currentInstant, + MonotonicClock::nanoTime) + .containsExactly(realTimeWall, instantToMicros(inst), inst, nano); + + monotonicClock.tick(); + + soft.assertThat(monotonicClock) + .extracting( + MonotonicClock::currentTimeMillis, + MonotonicClock::currentTimeMicros, + MonotonicClock::currentInstant, + MonotonicClock::nanoTime) + .containsExactly(realTimeWall, instantToMicros(inst), inst, nano); + + monotonicClock.tick(); + + // -- wall clock too slow + // Wall clock advanced by 100ms + // Real time advanced by 200ms + realTimeWall = 200; + nano = nanoOffset + MILLISECONDS.toNanos(realTimeWall) + 123456L; + systemWall = 100; + inst = Instant.ofEpochSecond(0, MILLISECONDS.toNanos(realTimeWall) + 123456L); + monotonicClock.setNanoTime(nano); + monotonicClock.setCurrentTimeMillis(systemWall); + + monotonicClock.tick(); + + soft.assertThat(monotonicClock) + .extracting( + MonotonicClock::currentTimeMillis, + MonotonicClock::currentTimeMicros, + MonotonicClock::currentInstant, + MonotonicClock::nanoTime) + .containsExactly(realTimeWall, instantToMicros(inst), inst, nano); + + realTimeWall = 1400; + nano = nanoOffset + MILLISECONDS.toNanos(realTimeWall) + 234567L; + systemWall = 1400; + inst = Instant.ofEpochSecond(0, MILLISECONDS.toNanos(1400) + 234567L); + monotonicClock.setNanoTime(nano); + monotonicClock.setCurrentTimeMillis(systemWall); + + monotonicClock.tick(); + + soft.assertThat(monotonicClock) + .extracting( + MonotonicClock::currentTimeMillis, + MonotonicClock::currentTimeMicros, + MonotonicClock::currentInstant, + MonotonicClock::nanoTime) + .containsExactly(realTimeWall, instantToMicros(inst), inst, nano); + + // wall clock goes backwards + // wall = 200; + monotonicClock.setNanoTime(nano); + systemWall = 1000; + monotonicClock.setCurrentTimeMillis(systemWall); + + monotonicClock.tick(); + + soft.assertThat(monotonicClock) + .extracting( + MonotonicClock::currentTimeMillis, + MonotonicClock::currentTimeMicros, + MonotonicClock::currentInstant, + MonotonicClock::nanoTime) + .containsExactly(realTimeWall, instantToMicros(inst), inst, nano); + + // wall clock advances + realTimeWall = 2000; + systemWall = 2000; + nano = nanoOffset + MILLISECONDS.toNanos(realTimeWall) + 234567L; + inst = Instant.ofEpochSecond(0, MILLISECONDS.toNanos(realTimeWall) + 234567L); + monotonicClock.setNanoTime(nano); + monotonicClock.setCurrentTimeMillis(systemWall); + + monotonicClock.tick(); + + soft.assertThat(monotonicClock) + .extracting( + MonotonicClock::currentTimeMillis, + MonotonicClock::currentTimeMicros, + MonotonicClock::currentInstant, + MonotonicClock::nanoTime) + .containsExactly(realTimeWall, instantToMicros(inst), inst, nano); + } + + @Test + public void currentInstantAndMillis() { + // Note: this test case emits FAKE "MonotonicClock tick loop is stalled" warnings! + // Ignore those warnings. + + // MonotonicClockImpl not started, no need to close() + @SuppressWarnings("resource") + var monotonicClock = new MutableMonotonicClock(0L, 0L); + + monotonicClock.tick(); + + soft.assertThat(monotonicClock.systemNanoTime()).isEqualTo(0L); + soft.assertThat(monotonicClock.currentTimeMicros()).isEqualTo(0L); + soft.assertThat(monotonicClock.currentTimeMillis()).isEqualTo(0L); + soft.assertThat(monotonicClock.currentInstant().toEpochMilli()).isEqualTo(0L); + + var nanos = 456111222333L; + var millis = TimeUnit.NANOSECONDS.toMillis(nanos); + monotonicClock.setCurrentTimeMillis(millis); + monotonicClock.setNanoTime(nanos); + + monotonicClock.tick(); + + soft.assertThat(monotonicClock.systemNanoTime()).isEqualTo(nanos); + soft.assertThat(monotonicClock.currentTimeMicros()) + .extracting(MICROSECONDS::toMillis) + .isEqualTo(millis); + soft.assertThat(monotonicClock.currentTimeMillis()).isEqualTo(millis); + soft.assertThat(monotonicClock.currentInstant().toEpochMilli()).isEqualTo(millis); + } + + @Test + public void strictlyMonotonicIfWallClockGoesBackwards() { + var adjustCalled = new AtomicBoolean(); + + // MonotonicClockImpl not started, no need to close() + @SuppressWarnings("resource") + var monotonicClock = + new MutableMonotonicClock() { + @Override + protected void afterAdjust() { + adjustCalled.set(true); + } + }; + + var initial = monotonicClock.currentTimeMillis(); + + // Begin --------------------------------------------------------------------- + // + // Check that the monotonic clock advances + + adjustCalled.set(false); + + // Increment the nano-clock source by 1 millisecond + monotonicClock.advanceNanos(1, MILLISECONDS); + + monotonicClock.tick(); + + // Test case: + var afterWork1 = monotonicClock.currentTimeMillis(); + soft.assertThat(adjustCalled).isFalse(); + soft.assertThat(afterWork1).isGreaterThan(initial); + + // < + new SnowflakeIdGeneratorFactory() + .buildIdGenerator(Map.of(), idGeneratorSource(0, clock::currentTimeMillis))) + .doesNotThrowAnyException(); + soft.assertThatCode( + () -> + new SnowflakeIdGeneratorFactory() + .buildIdGenerator( + Map.of(), + idGeneratorSource( + (1 << DEFAULT_NODE_ID_BITS) - 1, clock::currentTimeMillis))) + .doesNotThrowAnyException(); + } + + @Test + public void invalidArgs() { + var validClock = (LongSupplier) () -> ID_EPOCH_MILLIS; + + soft.assertThatIllegalArgumentException() + .isThrownBy( + () -> + new SnowflakeIdGeneratorFactory() + .buildIdGenerator(Map.of(), idGeneratorSource(-1, () -> 0L))) + .withMessage( + "Clock returns a timestamp 1970-01-01T00:00:00Z less than the configured epoch 2025-03-01T00:00:00Z, nodeId -1 out of range [0..1024["); + soft.assertThatIllegalArgumentException() + .isThrownBy( + () -> + new SnowflakeIdGeneratorFactory() + .buildIdGenerator(Map.of(), idGeneratorSource(-1, validClock))) + .withMessage("nodeId -1 out of range [0..1024["); + soft.assertThatIllegalArgumentException() + .isThrownBy( + () -> + new SnowflakeIdGeneratorFactory() + .buildIdGenerator( + Map.of(), idGeneratorSource(1 << DEFAULT_NODE_ID_BITS, validClock))) + .withMessage("nodeId 1024 out of range [0..1024["); + + soft.assertThatIllegalArgumentException() + .isThrownBy( + () -> + new SnowflakeIdGeneratorFactory() + .buildIdGenerator( + 31, + DEFAULT_SEQUENCE_BITS, + DEFAULT_NODE_ID_BITS, + ID_EPOCH_MILLIS, + ID_GENERATOR_SOURCE_CONSTANT)) + .withMessage("Sum of timestampBits + nodeBits + sequenceBits must be == 63"); + soft.assertThatIllegalArgumentException() + .isThrownBy( + () -> + new SnowflakeIdGeneratorFactory() + .buildIdGenerator( + DEFAULT_TIMESTAMP_BITS, + 4, + DEFAULT_NODE_ID_BITS, + ID_EPOCH_MILLIS, + ID_GENERATOR_SOURCE_CONSTANT)) + .withMessage( + "value of nodeBits 10 or sequenceBits 4 or timestampBits 41 is too low or too high, Sum of timestampBits + nodeBits + sequenceBits must be == 63"); + soft.assertThatIllegalArgumentException() + .isThrownBy( + () -> + new SnowflakeIdGeneratorFactory() + .buildIdGenerator( + DEFAULT_TIMESTAMP_BITS, + DEFAULT_SEQUENCE_BITS, + 4, + ID_EPOCH_MILLIS, + ID_GENERATOR_SOURCE_CONSTANT)) + .withMessage("Sum of timestampBits + nodeBits + sequenceBits must be == 63"); + + soft.assertThatIllegalArgumentException() + .isThrownBy( + () -> + new SnowflakeIdGeneratorFactory() + .buildIdGenerator( + 64, + DEFAULT_SEQUENCE_BITS, + DEFAULT_NODE_ID_BITS, + ID_EPOCH_MILLIS, + ID_GENERATOR_SOURCE_CONSTANT)) + .withMessage( + "value of nodeBits 10 or sequenceBits 12 or timestampBits 64 is too low or too high, Sum of timestampBits + nodeBits + sequenceBits must be == 63"); + soft.assertThatIllegalArgumentException() + .isThrownBy( + () -> + new SnowflakeIdGeneratorFactory() + .buildIdGenerator( + DEFAULT_TIMESTAMP_BITS, + 64, + DEFAULT_NODE_ID_BITS, + ID_EPOCH_MILLIS, + ID_GENERATOR_SOURCE_CONSTANT)) + .withMessage( + "value of nodeBits 10 or sequenceBits 64 or timestampBits 41 is too low or too high, Sum of timestampBits + nodeBits + sequenceBits must be == 63"); + soft.assertThatIllegalArgumentException() + .isThrownBy( + () -> + new SnowflakeIdGeneratorFactory() + .buildIdGenerator( + DEFAULT_TIMESTAMP_BITS, + DEFAULT_SEQUENCE_BITS, + 64, + ID_EPOCH_MILLIS, + ID_GENERATOR_SOURCE_CONSTANT)) + .withMessage( + "value of nodeBits 64 or sequenceBits 12 or timestampBits 41 is too low or too high, Sum of timestampBits + nodeBits + sequenceBits must be == 63"); + + soft.assertThatIllegalArgumentException() + .isThrownBy( + () -> + new SnowflakeIdGeneratorFactory() + .buildIdGenerator( + DEFAULT_TIMESTAMP_BITS, + 5, + DEFAULT_NODE_ID_BITS, + ID_EPOCH_MILLIS, + ID_GENERATOR_SOURCE_CONSTANT)) + .withMessage("Sum of timestampBits + nodeBits + sequenceBits must be == 63"); + soft.assertThatIllegalArgumentException() + .isThrownBy( + () -> + new SnowflakeIdGeneratorFactory() + .buildIdGenerator( + DEFAULT_TIMESTAMP_BITS, + 4, + 18, + ID_EPOCH_MILLIS, + ID_GENERATOR_SOURCE_CONSTANT)) + .withMessage( + "value of nodeBits 18 or sequenceBits 4 or timestampBits 41 is too low or too high"); + } + + @Test + public void clockBackwards() { + var clock = new AtomicLong(ID_EPOCH_MILLIS + 100); + + var nodeId = 42; + + var ids = new HashSet(); + + var impl = + new SnowflakeIdGeneratorFactory() + .buildIdGenerator(Map.of(), idGeneratorSource(nodeId, clock::get)); + + soft.assertThatCode(() -> soft.assertThat(ids.add(impl.generateId())).isTrue()) + .doesNotThrowAnyException(); + soft.assertThatCode(() -> soft.assertThat(ids.add(impl.generateId())).isTrue()) + .doesNotThrowAnyException(); + clock.addAndGet(20); + soft.assertThatCode(() -> soft.assertThat(ids.add(impl.generateId())).isTrue()) + .doesNotThrowAnyException(); + soft.assertThatCode(() -> soft.assertThat(ids.add(impl.generateId())).isTrue()) + .doesNotThrowAnyException(); + clock.addAndGet(-1); + soft.assertThatIllegalStateException() + .isThrownBy(impl::generateId) + .withMessage( + "Clock walked backwards from 120 to 119, provide a monotonically increasing clock source"); + clock.addAndGet(1); + soft.assertThatCode(() -> soft.assertThat(ids.add(impl.generateId())).isTrue()) + .doesNotThrowAnyException(); + } + + /** + * Tests concurrency in {@link SnowflakeIdGeneratorImpl}, forcing CAS update races on the {@code + * lastId} field and asserting that {@link SnowflakeIdGeneratorImpl#spinWaitRace()} has been + * called. + */ + @Test + @Timeout(value = 5, unit = MINUTES) + public void concurrency() throws Exception { + var clock = new AtomicLong(ID_EPOCH_MILLIS + 100); + + var nodeId = 42; + + var holdEnterT1 = new Semaphore(0); + var holdEnterT2 = new Semaphore(0); + var holdWaitT1 = new Semaphore(0); + var holdWaitT2 = new Semaphore(0); + var spinEnterT2 = new Semaphore(0); + var spinWaitT2 = new Semaphore(0); + + var syncGenerate = new CountDownLatch(1); + var syncMain = new CountDownLatch(2); + var thread1 = new AtomicReference(); + var impl = + new SnowflakeIdGeneratorImpl(idGeneratorSource(nodeId, clock::get)) { + @Override + void holdForTest() { + var isT1 = thread1.get() == Thread.currentThread(); + + (isT1 ? holdEnterT1 : holdEnterT2).release(); + (isT1 ? holdWaitT1 : holdWaitT2).acquireUninterruptibly(); + } + + @Override + void spinWaitRace() { + var isT1 = thread1.get() == Thread.currentThread(); + + checkState(!isT1); + + spinEnterT2.release(); + spinWaitT2.acquireUninterruptibly(); + } + }; + + try (var executorService = Executors.newFixedThreadPool(2)) { + + var t1 = + executorService.submit( + () -> { + thread1.set(Thread.currentThread()); + + syncMain.countDown(); + syncGenerate.await(); + + return impl.generateId(); + }); + + var t2 = + executorService.submit( + () -> { + syncMain.countDown(); + syncGenerate.await(); + + return impl.generateId(); + }); + + // Wait for both threads to reach the same state + syncMain.await(); + // Let both threads continue + syncGenerate.countDown(); + + // Wait until both threads reached the "holdForTest()" function after loading the 'lastId' + // field with the same value + holdEnterT1.acquireUninterruptibly(); + holdEnterT2.acquireUninterruptibly(); + + // Let T1 continue (and finish) - updates 'lastId' field + holdWaitT1.release(); + // Get T1's result + var id1 = t1.get(); + + // Let T2 continue - it will race updating the 'lastId' field + holdWaitT2.release(); + + // Wait until T2 enters "spinWait()" + spinEnterT2.acquireUninterruptibly(); + // Let T2 retry + spinWaitT2.release(); + + // Wait until T2 reached "holdForTest()" after hitting the 'lastId' update race + holdEnterT2.acquireUninterruptibly(); + // Let T2 continue - next iteration should not race + holdWaitT2.release(); + + var id2 = t2.get(); + + soft.assertThat(id1).isNotEqualTo(id2); + } + } + + @Test + public void manyThreads() throws Exception { + var threads = Runtime.getRuntime().availableProcessors() * 2; + var nodeId = 42; + var impl = + new SnowflakeIdGeneratorFactory() + .buildIdGenerator(Map.of(), idGeneratorSource(nodeId, clock::currentTimeMillis)); + + var sync = new CountDownLatch(threads); + var start = new CountDownLatch(1); + var done = new CountDownLatch(threads); + var finish = new CountDownLatch(1); + var numIdsPerThread = 5000; + + try (var executorService = Executors.newFixedThreadPool(threads)) { + var ids = ConcurrentHashMap.newKeySet(numIdsPerThread * threads * 2); + var futures = new ArrayList>(threads); + + for (int i = 0; i < threads; i++) { + futures.add( + executorService.submit( + () -> { + sync.countDown(); + start.await(); + + var localIds = new HashSet(numIdsPerThread * 2); + try { + for (int n = 0; n < numIdsPerThread; n++) { + localIds.add(impl.generateId()); + } + } finally { + done.countDown(); + finish.await(); + } + + ids.addAll(localIds); + + return null; + })); + } + + // Wait until all threads have started + sync.await(); + // Let threads start + start.countDown(); + + // Wait until all threads have started + done.await(); + // Let threads start + finish.countDown(); + + for (Future future : futures) { + future.get(); + } + + soft.assertThat(ids).hasSize(numIdsPerThread * threads); + } + } + + @Test + public void maxIdsPerMillisecondAtEpochOffset() { + var clock = (LongSupplier) () -> ID_EPOCH_MILLIS; + + var nodeId = 42; + + var impl = + new SnowflakeIdGeneratorImpl(idGeneratorSource(nodeId, clock)) { + @Override + void spinWaitSequence() { + throw new RuntimeException("Moo"); + } + + @Override + void spinWaitRace() { + throw new RuntimeException("Moo"); + } + }; + + // Implementation detail: the 1st sequence if the timestamp is equal to EPOCH_OFFSET is 1. + // All other initial timestamps start with sequence == 0. + var maxPerMillisecond = 4095; + + var ids = new long[maxPerMillisecond]; + for (var i = 0; i < maxPerMillisecond; i++) { + ids[i] = impl.generateId(); + } + + soft.assertThatThrownBy(impl::generateId) + .isInstanceOf(RuntimeException.class) + .hasMessage("Moo"); + soft.assertThatThrownBy(impl::generateId) + .isInstanceOf(RuntimeException.class) + .hasMessage("Moo"); + + var expect = ((long) nodeId) << DEFAULT_SEQUENCE_BITS | 1L; + for (var i = 0; i < ids.length; i++) { + var id = ids[i]; + soft.assertThat(id).describedAs("index %d", i).isEqualTo(expect); + soft.assertThat(impl.nodeFromId(id)).isEqualTo(nodeId); + soft.assertThat(impl.timestampFromId(id)).isEqualTo(0L); + soft.assertThat(impl.sequenceFromId(id)).isEqualTo(i + 1); + soft.assertThat(impl.idToString(id)) + .isEqualTo( + format( + "%s (%d), sequence %d, node %d", + Instant.ofEpochMilli(ID_EPOCH_MILLIS), 0, i + 1, nodeId)); + expect++; + if (i % 10 == 0) { + soft.assertAll(); + } + } + } + + @Test + public void maxIdsPerMillisecondAtNowWithMutableClock() { + var clockSource = new AtomicLong(ID_EPOCH_MILLIS + TimeUnit.DAYS.toMillis(365)); + var clock = (LongSupplier) clockSource::get; + var initialTimestamp = clock.getAsLong() - ID_EPOCH_MILLIS; + + var nodeId = 42; + + var impl = + new SnowflakeIdGeneratorImpl(idGeneratorSource(nodeId, clock)) { + @Override + void spinWaitSequence() { + clockSource.incrementAndGet(); + } + }; + + for (var millis = 0; millis < 5; millis++) { + var expect = + ((initialTimestamp + millis) << (DEFAULT_SEQUENCE_BITS + DEFAULT_NODE_ID_BITS)) + | + // + ((long) nodeId) << DEFAULT_SEQUENCE_BITS; + + for (var j = 0; j < 4096; j++) { + var id = impl.generateId(); + var uuid = impl.idToTimeUuid(id); + + soft.assertThat(impl.nodeFromId(id)).isEqualTo(nodeId).isEqualTo(uuid.node()); + soft.assertThat(impl.timestampFromId(id)) + .isEqualTo(initialTimestamp + millis) + .isEqualTo(uuid.timestamp() - ID_EPOCH_MILLIS); + soft.assertThat(uuid).extracting(UUID::variant, UUID::version).containsExactly(2, 1); + soft.assertThat(impl.timeUuidToId(uuid)).isEqualTo(id); + soft.assertThat(impl.timestampUtcFromId(id)).isEqualTo(uuid.timestamp()); + soft.assertThat(impl.sequenceFromId(id)).isEqualTo(j).isEqualTo(uuid.clockSequence()); + soft.assertThat(impl.idToString(id)) + .isEqualTo( + format( + "%s (%d), sequence %d, node %d", + Instant.ofEpochMilli(ID_EPOCH_MILLIS + initialTimestamp + millis), + initialTimestamp + millis, + j, + nodeId)); + soft.assertThat(id).describedAs("millis %d - seq %d", millis, j).isEqualTo(expect); + expect++; + if (j % 10 == 0) { + soft.assertAll(); + } + } + } + } + + @Test + public void miscUuid() { + var clockSource = new AtomicLong(ID_EPOCH_MILLIS + TimeUnit.DAYS.toMillis(365)); + var clock = (LongSupplier) clockSource::get; + + var nodeId = 42; + + var impl = + new SnowflakeIdGeneratorImpl(idGeneratorSource(nodeId, clock)) { + @Override + void spinWaitSequence() { + clockSource.incrementAndGet(); + } + }; + + soft.assertThatIllegalArgumentException() + .isThrownBy(() -> impl.timeUuidToId(UUID.randomUUID())) + .withMessage("Must be a version 1 / variant 2 UUID"); + soft.assertThatIllegalArgumentException() + .isThrownBy(() -> impl.timeUuidToId(UUID.nameUUIDFromBytes("foobar".getBytes(UTF_8)))) + .withMessage("Must be a version 1 / variant 2 UUID"); + + long tsUuidHighest = 0xFFFFFFFFFFFFFFFL; + long seqUuidHighest = 0xFFFFFFFFFFFFL; + long nodeUuidHighest = 0x3FFF; + + soft.assertThatIllegalArgumentException() + .isThrownBy( + () -> + impl.timeUuidToId( + new UUID( + timeUuidMsbReal(tsUuidHighest), + timeUuidLsb(seqUuidHighest, nodeUuidHighest)))) + .withMessage("TimeUUID contains values that cannot be condensed into a snowflake-ID"); + + soft.assertThatCode( + () -> + impl.timeUuidToId( + new UUID( + timeUuidMsbReal((1L << DEFAULT_TIMESTAMP_BITS) - 1), + timeUuidLsb( + (1L << DEFAULT_SEQUENCE_BITS) - 1, (1L << DEFAULT_NODE_ID_BITS) - 1)))) + .doesNotThrowAnyException(); + } + + @Test + public void maxIdsPerMillisecondAtNowWithRealClock() { + var nodeId = 42; + + var initialTimestamp = clock.currentTimeMillis() - ID_EPOCH_MILLIS; + var impl = + new SnowflakeIdGeneratorFactory() + .buildIdGenerator(Map.of(), idGeneratorSource(nodeId, clock::currentTimeMillis)); + + var ids = new HashSet(); + for (var i = 0; i < 10 * 4096; i++) { + + var id = impl.generateId(); + soft.assertThat(ids.add(id)).isTrue(); + + var uuid = impl.idToTimeUuid(id); + + soft.assertThat(impl.nodeFromId(id)).isEqualTo(nodeId).isEqualTo(uuid.node()); + soft.assertThatCode( + () -> { + assertThat(uuid.node()).isEqualTo(nodeId); + assertThat(uuid.timestamp()) + .isGreaterThan(ID_EPOCH_MILLIS) + .isEqualTo(impl.timestampUtcFromId(id)); + assertThat(uuid.clockSequence()).isGreaterThanOrEqualTo(0).isLessThan(4096); + assertThat(uuid.variant()).isEqualTo(2); + assertThat(uuid.version()).isEqualTo(1); + }) + .doesNotThrowAnyException(); + soft.assertThat(impl.timeUuidToId(uuid)).isEqualTo(id); + soft.assertThat(impl.timestampFromId(id)).isGreaterThanOrEqualTo(initialTimestamp); + soft.assertThat(impl.sequenceFromId(id)) + .isGreaterThanOrEqualTo(0) + .isLessThan(4096) + .isEqualTo(uuid.clockSequence()); + soft.assertThat(impl.idToString(id)) + .isEqualTo( + format( + "%s (%d), sequence %d, node %d", + Instant.ofEpochMilli(impl.timestampFromId(id) + ID_EPOCH_MILLIS), + impl.timestampFromId(id), + impl.sequenceFromId(id), + nodeId)); + + if (i % 10 == 0) { + soft.assertAll(); + } + } + } + + @Test + public void validationCallback() { + var nodeId = 42; + + var validationValue = new AtomicBoolean(true); + + var impl = + new SnowflakeIdGeneratorFactory() + .buildIdGenerator( + Map.of(), + new IdGeneratorSource() { + @Override + public int nodeId() { + return validationValue.get() ? nodeId : -1; + } + + @Override + public long currentTimeMillis() { + return clock.currentTimeMillis(); + } + }); + + soft.assertThatCode(impl::generateId).doesNotThrowAnyException(); + + validationValue.set(false); + + soft.assertThatIllegalStateException() + .isThrownBy(impl::generateId) + .withMessage("Cannot generate a new ID, shutting down?"); + } + + static IdGeneratorSource idGeneratorSource(int nodeId, LongSupplier clock) { + return new IdGeneratorSource() { + @Override + public int nodeId() { + return nodeId; + } + + @Override + public long currentTimeMillis() { + return clock.getAsLong(); + } + }; + } +} diff --git a/persistence/nosql/idgen/impl/src/test/resources/logback-test.xml b/persistence/nosql/idgen/impl/src/test/resources/logback-test.xml new file mode 100644 index 0000000000..4a4d9a629d --- /dev/null +++ b/persistence/nosql/idgen/impl/src/test/resources/logback-test.xml @@ -0,0 +1,32 @@ + + + + + + + %date{ISO8601} [%thread] %-5level %logger{36} - %msg%n + + + + + + diff --git a/persistence/nosql/idgen/mocks/build.gradle.kts b/persistence/nosql/idgen/mocks/build.gradle.kts new file mode 100644 index 0000000000..fc886dc833 --- /dev/null +++ b/persistence/nosql/idgen/mocks/build.gradle.kts @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + id("org.kordamp.gradle.jandex") + id("polaris-server") +} + +description = "Polaris ID generation mocks for testing" + +dependencies { + api(project(":polaris-idgen-api")) + api(project(":polaris-idgen-spi")) + api(project(":polaris-idgen-impl")) + + implementation(libs.guava) + implementation(libs.slf4j.api) + + compileOnly(libs.jakarta.annotation.api) + compileOnly(libs.jakarta.validation.api) + compileOnly(libs.jakarta.inject.api) + compileOnly(libs.jakarta.enterprise.cdi.api) + + implementation(platform(libs.jackson.bom)) + implementation("com.fasterxml.jackson.core:jackson-databind") +} diff --git a/persistence/nosql/idgen/mocks/src/main/java/org/apache/polaris/ids/mocks/MutableMonotonicClock.java b/persistence/nosql/idgen/mocks/src/main/java/org/apache/polaris/ids/mocks/MutableMonotonicClock.java new file mode 100644 index 0000000000..89433733d0 --- /dev/null +++ b/persistence/nosql/idgen/mocks/src/main/java/org/apache/polaris/ids/mocks/MutableMonotonicClock.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.ids.mocks; + +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import jakarta.enterprise.inject.Specializes; +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.polaris.ids.impl.MonotonicClockImpl; + +@Specializes +public class MutableMonotonicClock extends MonotonicClockImpl { + private final AtomicLong nanoTime = new AtomicLong(); + private final AtomicLong currentTimeMillis = new AtomicLong(); + + public MutableMonotonicClock() { + this(System.currentTimeMillis(), System.nanoTime()); + } + + public MutableMonotonicClock(long currentTimeMillis, long nanoTime) { + super(false); + this.currentTimeMillis.set(currentTimeMillis); + this.nanoTime.set(nanoTime); + setup(); + } + + @CanIgnoreReturnValue + public MutableMonotonicClock setCurrentTimeMillis(long currentTimeMillis) { + this.currentTimeMillis.set(currentTimeMillis); + return this; + } + + @CanIgnoreReturnValue + public MutableMonotonicClock setNanoTime(long nanoTime) { + this.nanoTime.set(nanoTime); + return this; + } + + @CanIgnoreReturnValue + public MonotonicClockImpl advanceBoth(long time, TimeUnit unit) { + nanoTime.addAndGet(unit.toNanos(time)); + currentTimeMillis.addAndGet(unit.toMillis(time)); + return this; + } + + @CanIgnoreReturnValue + public MonotonicClockImpl advanceBoth(Duration duration) { + nanoTime.addAndGet(duration.toNanos()); + currentTimeMillis.addAndGet(duration.toMillis()); + return this; + } + + @CanIgnoreReturnValue + public MonotonicClockImpl advanceNanos(long time, TimeUnit unit) { + nanoTime.addAndGet(unit.toNanos(time)); + return this; + } + + @CanIgnoreReturnValue + public MonotonicClockImpl advanceNanos(Duration duration) { + nanoTime.addAndGet(duration.toNanos()); + return this; + } + + @CanIgnoreReturnValue + public MonotonicClockImpl advanceCurrentTimeMillis(long time, TimeUnit unit) { + currentTimeMillis.addAndGet(unit.toMillis(time)); + return this; + } + + @CanIgnoreReturnValue + public MonotonicClockImpl advanceCurrentTimeMillis(Duration duration) { + currentTimeMillis.addAndGet(duration.toMillis()); + return this; + } + + @Override + public long systemCurrentTimeMillis() { + return currentTimeMillis.get(); + } + + @Override + public long systemNanoTime() { + return nanoTime.get(); + } +} diff --git a/persistence/nosql/idgen/mocks/src/main/resources/META-INF/beans.xml b/persistence/nosql/idgen/mocks/src/main/resources/META-INF/beans.xml new file mode 100644 index 0000000000..a297f1aa53 --- /dev/null +++ b/persistence/nosql/idgen/mocks/src/main/resources/META-INF/beans.xml @@ -0,0 +1,24 @@ + + + + + \ No newline at end of file diff --git a/persistence/nosql/idgen/spi/build.gradle.kts b/persistence/nosql/idgen/spi/build.gradle.kts new file mode 100644 index 0000000000..94ac6d3529 --- /dev/null +++ b/persistence/nosql/idgen/spi/build.gradle.kts @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + id("org.kordamp.gradle.jandex") + id("polaris-server") +} + +description = "Polaris ID generation SPI" + +dependencies { + implementation(project(":polaris-idgen-api")) + + compileOnly(libs.jakarta.annotation.api) + compileOnly(libs.jakarta.validation.api) + compileOnly(libs.jakarta.inject.api) + compileOnly(libs.jakarta.enterprise.cdi.api) + + compileOnly(libs.smallrye.config.core) + compileOnly(platform(libs.quarkus.bom)) + compileOnly("io.quarkus:quarkus-core") + + annotationProcessor(project(":polaris-immutables", configuration = "processor")) + + implementation(platform(libs.jackson.bom)) + implementation("com.fasterxml.jackson.core:jackson-databind") +} diff --git a/persistence/nosql/idgen/spi/src/main/java/org/apache/polaris/ids/spi/IdGeneratorFactory.java b/persistence/nosql/idgen/spi/src/main/java/org/apache/polaris/ids/spi/IdGeneratorFactory.java new file mode 100644 index 0000000000..767d9fd53d --- /dev/null +++ b/persistence/nosql/idgen/spi/src/main/java/org/apache/polaris/ids/spi/IdGeneratorFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.ids.spi; + +import java.util.Map; +import java.util.ServiceLoader; +import org.apache.polaris.ids.api.IdGenerator; + +/** Provides values for ID generators, usually provided by {@code NodeLease} implementations. */ +public interface IdGeneratorFactory { + String name(); + + void validateParameters(Map params, IdGeneratorSource idGeneratorSource); + + I buildIdGenerator(Map params, IdGeneratorSource idGeneratorSource); + + I buildSystemIdGenerator(Map params); + + static IdGeneratorFactory lookupFactory(String name) { + for (IdGeneratorFactory factory : ServiceLoader.load(IdGeneratorFactory.class)) { + if (factory.name().equals(name)) { + return factory; + } + } + throw new IllegalArgumentException("No IdGeneratorFactory found for name " + name); + } +} diff --git a/persistence/nosql/idgen/spi/src/main/java/org/apache/polaris/ids/spi/IdGeneratorSource.java b/persistence/nosql/idgen/spi/src/main/java/org/apache/polaris/ids/spi/IdGeneratorSource.java new file mode 100644 index 0000000000..a84b19f1b1 --- /dev/null +++ b/persistence/nosql/idgen/spi/src/main/java/org/apache/polaris/ids/spi/IdGeneratorSource.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.ids.spi; + +/** Provides values for ID generators, usually provided by {@code NodeLease} implementations. */ +public interface IdGeneratorSource { + /** Returns the node ID if the node is active/valid or {@code -1}. */ + int nodeId(); + + long currentTimeMillis(); +}