diff --git a/gora-geode/pom.xml b/gora-geode/pom.xml new file mode 100644 index 00000000..a93ea757 --- /dev/null +++ b/gora-geode/pom.xml @@ -0,0 +1,87 @@ + + + + gora + org.apache.gora + 1.0-SNAPSHOT + ../pom.xml + + 4.0.0 + gora-geode + Apache Gora :: Geode + + + 8 + 8 + + + + + + org.apache.gora + gora-core + + + + org.apache.gora + gora-core + test-jar + test + + + + org.apache.avro + avro + + + + + org.apache.hadoop + hadoop-client + + + + + org.apache.geode + geode-core + + + + + org.slf4j + slf4j-log4j12 + + + + log4j + log4j + + + javax.jms + jms + + + + + + + junit + junit + test + + + + org.apache.hadoop + hadoop-minicluster + test + + + org.testcontainers + testcontainers + test + + + + \ No newline at end of file diff --git a/gora-geode/src/main/java/org/apache/gora/geode/query/GeodeQuery.java b/gora-geode/src/main/java/org/apache/gora/geode/query/GeodeQuery.java new file mode 100644 index 00000000..f6684d5d --- /dev/null +++ b/gora-geode/src/main/java/org/apache/gora/geode/query/GeodeQuery.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.geode.query; + +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.impl.QueryBase; +import org.apache.gora.store.DataStore; + +/** + * {@link GeodeQuery} is the primary class + * responsible for representing a cache manipulation query. + */ +public class GeodeQuery extends QueryBase { + + public GeodeQuery() { + super(null); + } + + public GeodeQuery(DataStore dataStore) { + super(dataStore); + } + +} diff --git a/gora-geode/src/main/java/org/apache/gora/geode/query/GeodeResult.java b/gora-geode/src/main/java/org/apache/gora/geode/query/GeodeResult.java new file mode 100644 index 00000000..9583270b --- /dev/null +++ b/gora-geode/src/main/java/org/apache/gora/geode/query/GeodeResult.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.geode.query; + +import org.apache.gora.geode.store.GeodeStore; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.Query; +import org.apache.gora.query.impl.ResultBase; +import org.apache.gora.store.DataStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Iterator; +import java.util.NavigableSet; + +/** + * {@link GeodeResult} is the primary class + * responsible for representing result set of a cache manipulation query + * {@link org.apache.gora.geode.query.GeodeQuery} + */ +public class GeodeResult extends ResultBase { + + private static final Logger LOG = LoggerFactory.getLogger(GeodeResult.class); + private NavigableSet cacheKeySet; + private Iterator iterator; + private int current; + + public GeodeResult(DataStore dataStore, Query query) { + super(dataStore, query); + } + + public GeodeResult(DataStore dataStore, Query query, NavigableSet cacheKeySet) { + super(dataStore, query); + this.cacheKeySet = cacheKeySet; + this.iterator = cacheKeySet.iterator(); + this.current = 0; + } + + public GeodeStore getDataStore() { + return (GeodeStore) super.getDataStore(); + } + + @Override + public float getProgress() throws IOException { + if (cacheKeySet.size() == 0) { + return 1; + } + return ((float) current / (float) cacheKeySet.size()); + } + + @Override + protected boolean nextInner() throws IOException { + if (!iterator.hasNext()) { + return false; + } + key = iterator.next(); + LOG.info("Results set pointer is now moved to key {}.", key); + persistent = dataStore.get(key); + this.current++; + return true; + } + + @Override + public int size() { + int totalSize = cacheKeySet.size(); + int intLimit = (int) this.limit; + return intLimit > 0 && totalSize > intLimit ? intLimit : totalSize; + } +} diff --git a/gora-geode/src/main/java/org/apache/gora/geode/store/GeodeStore.java b/gora-geode/src/main/java/org/apache/gora/geode/store/GeodeStore.java new file mode 100644 index 00000000..126b800b --- /dev/null +++ b/gora-geode/src/main/java/org/apache/gora/geode/store/GeodeStore.java @@ -0,0 +1,197 @@ +package org.apache.gora.geode.store; + +import org.apache.geode.cache.Region; +import org.apache.geode.cache.CacheFactory; +import org.apache.geode.cache.RegionFactory; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientCacheFactory; +import org.apache.gora.geode.query.GeodeQuery; +import org.apache.gora.geode.query.GeodeResult; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.PartitionQuery; +import org.apache.gora.query.Query; +import org.apache.gora.query.Result; +import org.apache.gora.query.impl.PartitionQueryImpl; +import org.apache.gora.store.impl.DataStoreBase; +import org.apache.gora.util.GoraException; + +import java.io.IOException; +import java.util.NavigableSet; +import java.util.Properties; +import java.util.List; +import java.util.ArrayList; +import java.util.Collections; +import java.util.concurrent.ConcurrentSkipListSet; + +import static org.apache.geode.cache.RegionShortcut.REPLICATE; +import static org.apache.gora.geode.store.GeodeStoreParameters.GEODE_USERNAME; +import static org.apache.gora.geode.store.GeodeStoreParameters.GEODE_SERVER_PORT; +import static org.apache.gora.geode.store.GeodeStoreParameters.GEODE_SERVER_HOST; +import static org.apache.gora.geode.store.GeodeStoreParameters.GEODE_PASSWORD; +import static org.apache.gora.geode.store.GeodeStoreParameters.GEODE_REGION_SHORTCUT; +import static org.apache.gora.geode.store.GeodeStoreParameters.PREFERRED_SCHEMA_NAME; + + +public class GeodeStore extends DataStoreBase { + + private ClientCache clientCache; + private Region region; + private Properties geodeProperties; + private CacheFactory cacheFactory; + + @Override + public void initialize(Class keyClass, Class persistentClass, Properties properties) throws GoraException { + super.initialize(keyClass, persistentClass, properties); + String geodeHostName = (String) properties.get(GEODE_SERVER_HOST); + int portNumber = Integer.parseInt((String) properties.get(GEODE_SERVER_PORT)); + clientCache = new ClientCacheFactory().addPoolLocator(geodeHostName, portNumber).create(); + String userName = properties.getProperty(GEODE_USERNAME); + String password = properties.getProperty(GEODE_PASSWORD); + geodeProperties = properties; + + Properties clientProperties = clientCache.getDistributedSystem().getProperties(); + if (userName != null) { + clientProperties.setProperty("security-username", userName); + clientProperties.setProperty("security-password", password); + } else throw new GoraException(); + cacheFactory = new CacheFactory(clientProperties); + } + + @Override + public String getSchemaName() { + String preferredSchemaName = properties.getProperty(PREFERRED_SCHEMA_NAME); + if (preferredSchemaName == null) { + return persistentClass.getSimpleName(); + } + return preferredSchemaName; + } + + @Override + public void createSchema() throws GoraException { + try { + Cache cache = cacheFactory.create(); + String regionShortCut = geodeProperties.getProperty(GEODE_REGION_SHORTCUT); + RegionFactory regionFactory; + if (regionShortCut != null) { + regionFactory = cache.createRegionFactory(RegionShortcut.valueOf(regionShortCut)); + } else { + regionFactory = cache.createRegionFactory(REPLICATE); + } + region = regionFactory.create(getSchemaName()); + } catch (Exception e) { + throw new GoraException(e); + } + } + + @Override + public void deleteSchema() { + region.destroyRegion(); + } + + @Override + public boolean schemaExists() { + Properties properties = clientCache.getDistributedSystem().getProperties(); + CacheFactory factory = new CacheFactory(properties); + Cache cache = factory.create(); + Region rf = cache.getRegion(getSchemaName()); + return rf != null; + } + + @Override + public boolean exists(K key) { + for (K existingKey : region.getInterestList()) { + if (existingKey.equals(key)) { + return true; + } + } + return false; + } + + @Override + public T get(K key, String[] fields) { + return region.get(key); + } + + @Override + public void put(K key, T obj) { + region.put(key, obj); + } + + @Override + public boolean delete(K key) { + region.destroy(key); + return true; + } + + @Override + public long deleteByQuery(Query query) throws GoraException { + try { + long deletedRows = 0; + Result result = query.execute(); + while (result.next()) { + if (delete(result.getKey())) { + deletedRows++; + } + } + LOG.info("Geode datastore deleted {} rows from Persistent datastore.", deletedRows); + return deletedRows; + } catch (Exception e) { + throw new GoraException(e); + } + } + + @Override + public Result execute(Query query) { + K startKey = query.getStartKey(); + K endKey = query.getEndKey(); + NavigableSet cacheEntrySubList = new ConcurrentSkipListSet<>(); + if (startKey != null && endKey != null) { + boolean isInTheRegion = false; + for (K key : region.keySet()) { + if (key == startKey) { + isInTheRegion = true; + } + if (isInTheRegion) { + cacheEntrySubList.add(key); + } + if (key == endKey) { + break; + } + } + } else { + // Empty + cacheEntrySubList = Collections.emptyNavigableSet(); + } + return new GeodeResult<>(this, query, cacheEntrySubList); + } + + + @Override + public Query newQuery() { + GeodeQuery query = new GeodeQuery<>(this); + query.setFields(getFieldsToQuery(null)); + return query; + } + + @Override + public List> getPartitions(Query query) throws IOException { + List> partitions = new ArrayList<>(); + PartitionQueryImpl partitionQuery = new PartitionQueryImpl<>( + query); + partitionQuery.setConf(this.getConf()); + partitions.add(partitionQuery); + return partitions; + } + + @Override + public void flush() { + LOG.info("Geode datastore flushed successfully."); + } + + @Override + public void close() { + } + +} diff --git a/gora-geode/src/main/java/org/apache/gora/geode/store/GeodeStoreParameters.java b/gora-geode/src/main/java/org/apache/gora/geode/store/GeodeStoreParameters.java new file mode 100644 index 00000000..138bc299 --- /dev/null +++ b/gora-geode/src/main/java/org/apache/gora/geode/store/GeodeStoreParameters.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.geode.store; + +/** + * Configuration Properties. + */ +public class GeodeStoreParameters { + + /** + * Property pointing to geode server contact points. + * string (multiple values with comma separated) + */ + public static final String GEODE_SERVER_HOST = "gora.geode.server.hostname"; + + /** + * Property pointing to the port to use to connect to the geode hosts. + * integer + */ + public static final String GEODE_SERVER_PORT = "gora.geode.server.port"; + + /** + * Property pointing to the gora schemaName. + * integer + */ + public static final String PREFERRED_SCHEMA_NAME = "gora.geode.preferred.schemaName"; + + /** + * Property pointing to the geode region shortcut. + * integer + */ + public static final String GEODE_REGION_SHORTCUT = "gora.geode.region.shortcut"; + + /** + * Property pointing to the username to connect to the server. + * string + */ + public static final String GEODE_USERNAME = "gora.geode.username"; + + /** + * Property pointing to the password to connect to the server. + * string + */ + public static final String GEODE_PASSWORD = "gora.geode.password"; + +} diff --git a/gora-geode/src/test/conf/gora.properties b/gora-geode/src/test/conf/gora.properties new file mode 100644 index 00000000..3e8e111a --- /dev/null +++ b/gora-geode/src/test/conf/gora.properties @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +gora.datastore.default=org.apache.gora.geode.store.GeodeStore +gora.datastore.geode.server=localhost +gora.datastore.geode.port="gora.geode.server.port" \ No newline at end of file diff --git a/gora-geode/src/test/java/org/apache/gora/geode/GeodeStartupLogWaitStrategy.java b/gora-geode/src/test/java/org/apache/gora/geode/GeodeStartupLogWaitStrategy.java new file mode 100644 index 00000000..224036a8 --- /dev/null +++ b/gora-geode/src/test/java/org/apache/gora/geode/GeodeStartupLogWaitStrategy.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.geode; + +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; + + +/** + * Log based Geode server startup wait strategy to sync server + * startup to test suit startup. + */ +public class GeodeStartupLogWaitStrategy extends LogMessageWaitStrategy { + + private static final String regEx = ".*Apache Geode Server Started Successfully.*"; + + public GeodeStartupLogWaitStrategy() { + withRegEx(regEx); + withTimes(1); + } + +} \ No newline at end of file diff --git a/gora-geode/src/test/java/org/apache/gora/geode/GoraGeodeTestDriver.java b/gora-geode/src/test/java/org/apache/gora/geode/GoraGeodeTestDriver.java new file mode 100644 index 00000000..5beec1af --- /dev/null +++ b/gora-geode/src/test/java/org/apache/gora/geode/GoraGeodeTestDriver.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.geode; + +import org.apache.gora.GoraTestDriver; + +import org.apache.gora.geode.store.GeodeStore; +import org.apache.gora.geode.store.GeodeStoreParameters; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.store.DataStore; +import org.apache.gora.store.DataStoreFactory; +import org.apache.gora.util.GoraException; +import org.testcontainers.containers.GenericContainer; + +import java.util.Properties; + +import static org.apache.gora.geode.store.GeodeStoreParameters.GEODE_SERVER_PORT; + +/** + * Helper class for third party tests using gora-geode backend. + * @see GoraTestDriver for test specifics. + * This driver is the base for all test cases that require an Geode server. + * In this case we use docker container. A docker container is run before tests + * and it is stopped after tests. + * + */ +public class GoraGeodeTestDriver extends GoraTestDriver { + + private final GenericContainer GeodeContainer; + private Properties properties = DataStoreFactory.createProps(); + + /** + * Default constructor + */ + public GoraGeodeTestDriver(GenericContainer GeodeContainer) { + super(GeodeStore.class); + this.GeodeContainer = GeodeContainer; + } + + @Override + public void setUpClass() { + log.info("Setting up Geode Test Driver"); + properties.put(GeodeStoreParameters.GEODE_SERVER_HOST, GeodeContainer.getContainerIpAddress()); + properties.put(GEODE_SERVER_PORT, GeodeContainer.getMappedPort(10334).toString()); + this.GeodeContainer.start(); + } + + @Override + public void tearDownClass() { + this.GeodeContainer.stop(); + log.info("Teardown Geode test driver"); + } + + /** + * Instantiate a new {@link DataStore}. Uses 'null' schema. + * + * @param keyClass The key class. + * @param persistentClass The value class. + * @return A new store instance. + * @throws GoraException + */ + @Override + public DataStore createDataStore(Class keyClass, Class persistentClass) + throws GoraException { + + final DataStore dataStore = DataStoreFactory + .createDataStore((Class>) dataStoreClass, keyClass, persistentClass, conf, + properties); + dataStores.add(dataStore); + log.info("Datastore for {} was added.", persistentClass); + return dataStore; + } + +} diff --git a/gora-geode/src/test/java/store/TestGeodeStore.java b/gora-geode/src/test/java/store/TestGeodeStore.java new file mode 100644 index 00000000..ed795f4f --- /dev/null +++ b/gora-geode/src/test/java/store/TestGeodeStore.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package store; + + + +import org.apache.avro.util.Utf8; +import org.apache.gora.examples.WebPageDataCreator; +import org.apache.gora.examples.generated.WebPage; +import org.apache.gora.geode.GoraGeodeTestDriver; +import org.apache.gora.geode.query.GeodeResult; +import org.apache.gora.query.Query; +import org.apache.gora.store.DataStoreTestBase; +import org.apache.gora.util.GoraException; +import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.Test; +import org.testcontainers.containers.GenericContainer; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; + +import static org.junit.Assert.*; + +/** + * Tests extending {@link DataStoreTestBase} + * which run the base JUnit test suite for Gora. + */ +public class TestGeodeStore extends DataStoreTestBase { + + private static final String DOCKER_CONTAINER_NAME = "apachegeode/" + + "" + + "geode:1.15.0"; + + /** + * JUnit integration testing with Docker and Testcontainers + */ + @ClassRule + public static GenericContainer Geode_CONTAINER = new GenericContainer(DOCKER_CONTAINER_NAME) + .withCommand("tail","-f","/dev/null"); + + static { + try { + setTestDriver(new GoraGeodeTestDriver(Geode_CONTAINER)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void setUp() throws Exception { + super.setUp(); + } + + @Test + public void testPutAndGet() throws GoraException { + WebPage page = webPageStore.newPersistent(); + + // Write webpage data + page.setUrl(new Utf8("http://example.com")); + byte[] contentBytes = "example content in example.com".getBytes(Charset.defaultCharset()); + ByteBuffer buff = ByteBuffer.wrap(contentBytes); + page.setContent(buff); + webPageStore.put("com.example/http", page); + webPageStore.flush(); + + WebPage storedPage = webPageStore.get("com.example/http"); + + assertNotNull(storedPage); + assertEquals(page.getUrl(), storedPage.getUrl()); + } + + @Test + public void testCreateAndDeleteSchema() throws IOException { + WebPage page = webPageStore.newPersistent(); + + // Write webpage data + page.setUrl(new Utf8("http://example.com")); + webPageStore.put("com.example/http", page); + webPageStore.flush(); + + assertEquals("WebPage isn't created.", page.getUrl(), webPageStore.get("com.example/http").getUrl()); + + webPageStore.deleteSchema(); + + assertNull(webPageStore.get("com.example/http")); + } + + @Test + public void testGetSchemaName() throws IOException { + assertEquals("WebPage", webPageStore.getSchemaName()); + assertEquals("Employee", employeeStore.getSchemaName()); + } + + @Test + public void testExecute() throws IOException { + WebPageDataCreator.createWebPageData(webPageStore); + + final Query query = webPageStore.newQuery(); + + int limit = 5; + query.setLimit(limit); + GeodeResult result = (GeodeResult) webPageStore.execute(query); + assertEquals(limit, result.size()); + + limit = 10; + query.setLimit(limit); + result = (GeodeResult) webPageStore.execute(query); + assertEquals(limit, result.size()); + + } + + /** + * By design, you cannot update a Geode document blindly, you can only attempt to update a specific revision of a document. FIXME + */ + @Test + @Ignore + public void testUpdate() throws Exception { + //By design, you cannot update a Geode document blindly, you can only attempt to update a specific revision of a document. FIXME + } + + @Ignore("GeodeStore doesn't support 3 types union field yet") + @Override + public void testGet3UnionField() throws Exception { + // GeodeStore doesn't support 3 types union field yet + } + + @Ignore("Skip until GORA-66 is fixed: need better semantic for end/start keys") + @Override + public void testDeleteByQueryFields() throws IOException { + // Skip until GORA-66 is fixed: need better semantic for end/start keys + } + +} diff --git a/pom.xml b/pom.xml index 5dc91eac..ab2244ef 100755 --- a/pom.xml +++ b/pom.xml @@ -780,6 +780,7 @@ gora-arangodb gora-rethinkdb gora-elasticsearch + gora-geode gora-tutorial gora-benchmark sources-dist @@ -1639,6 +1640,13 @@ 2.0.10 + + + org.apache.geode + geode-core + 1.15.0 + + com.amazonaws