Skip to content

Commit

Permalink
docs(azure-cosmosdb): Add scaladoc, license, experimental annotation …
Browse files Browse the repository at this point in the history
…and move the feature in CHANGELOG.md to 2.45.0 version.
  • Loading branch information
Miuler committed Dec 30, 2022
1 parent 211d889 commit ed2dfb7
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 32 deletions.
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Support for read from Cosmos DB Core SQL API [#23604](https://github.com/apache/beam/issues/23604)
* MongoDB IO connector added (Go) ([#24575](https://github.com/apache/beam/issues/24575)).

## New Features / Improvements
Expand Down Expand Up @@ -98,7 +99,6 @@
* S3 implementation of the Beam filesystem (Go) ([#23991](https://github.com/apache/beam/issues/23991)).
* Support for SingleStoreDB source and sink added (Java) ([#22617](https://github.com/apache/beam/issues/22617)).
* Added support for DefaultAzureCredential authentication in Azure Filesystem (Python) ([#24210](https://github.com/apache/beam/issues/24210)).
* Support for read from Cosmos DB Core SQL API [#23610](https://github.com/apache/beam/pull/23610)

## New Features / Improvements

Expand Down
27 changes: 26 additions & 1 deletion sdks/java/io/azure-cosmosdb/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,22 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Cosmos DB Core SQL API

Compile all module azure-cosmosdb
Expand All @@ -6,6 +25,12 @@ Compile all module azure-cosmosdb
gradle sdks:java:io:azure-cosmosdb:build
```

Valite code:
```shell
gradle rat
gradle sdks:java:io:azure-cosmosdb:spotbugsMain
```

## Test

Run TEST for this module (Cosmos DB Core SQL API):
Expand All @@ -17,7 +42,7 @@ gradle sdks:java:io:azure-cosmosdb:test

## Publish in Maven Local

Publish this module
Publish this module

```shell
# apache beam core
Expand Down
2 changes: 1 addition & 1 deletion sdks/java/io/azure-cosmosdb/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ dependencies {
dependencies {
testImplementation("org.testcontainers:azure:${cosmosContainerVersion}")
testImplementation("com.outr:scribe_2.12:3.10.4")
testImplementation("com.outr:scribe-slf4j_2.12:3.10.5")
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation library.java.mockito_core
testImplementation("org.junit.vintage:junit-vintage-engine:${junitVersion}")
testRuntimeOnly("org.apache.logging.log4j:log4j-api:$log4jVersion")
testRuntimeOnly("org.apache.logging.log4j:log4j-core:$log4jVersion")
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.azure.cosmos

import com.azure.cosmos.models.CosmosQueryRequestOptions
import com.azure.cosmos.{CosmosClient, CosmosClientBuilder}
import com.azure.cosmos.{ CosmosClient, CosmosClientBuilder }
import org.apache.beam.sdk.annotations.Experimental
import org.apache.beam.sdk.annotations.Experimental.Kind
import org.apache.beam.sdk.io.BoundedSource
import org.bson.Document
import org.slf4j.LoggerFactory


private class CosmosBoundedReader(cosmosSource: CosmosBoundedSource) extends BoundedSource.BoundedReader[Document] {
@Experimental(Kind.SOURCE_SINK)
private class CosmosBoundedReader(cosmosSource: CosmosBoundedSource) extends BoundedSource.BoundedReader[Document] {
private val log = LoggerFactory.getLogger(getClass)
private var maybeClient: Option[CosmosClient] = None
private var maybeIterator: Option[java.util.Iterator[Document]] = None
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.azure.cosmos

import org.apache.beam.sdk.coders.{Coder, SerializableCoder}
import org.apache.beam.sdk.annotations.Experimental
import org.apache.beam.sdk.annotations.Experimental.Kind
import org.apache.beam.sdk.coders.{ Coder, SerializableCoder }
import org.apache.beam.sdk.io.BoundedSource
import org.apache.beam.sdk.options.PipelineOptions
import org.bson.Document

import java.util
import java.util.Collections

class CosmosBoundedSource(val readCosmos: CosmosRead) extends BoundedSource[Document] {
/** A CosmosDB Core (SQL) API {@link BoundedSource} reading {@link Document} from a given instance. */
@Experimental(Kind.SOURCE_SINK)
private class CosmosBoundedSource(private[cosmos] val readCosmos: CosmosRead) extends BoundedSource[Document] {

/** @inheritDoc
* TODO: You have to find a better way, maybe by partition key */
override def split(desiredBundleSizeBytes: Long, options: PipelineOptions): util.List[CosmosBoundedSource] = Collections.singletonList(this)

/** @inheritDoc
* The Cosmos DB Coro (SQL) API not support this metrics by the querys */
override def getEstimatedSizeBytes(options: PipelineOptions): Long = 0L
override def getEstimatedSizeBytes(options: PipelineOptions) = 0L

override def getOutputCoder: Coder[Document] = SerializableCoder.of(classOf[Document])

override def createReader(options: PipelineOptions): BoundedSource.BoundedReader[Document] =
override def createReader(options: PipelineOptions) =
new CosmosBoundedReader(this)
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.azure.cosmos

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import org.apache.beam.sdk.annotations.Experimental
import org.apache.beam.sdk.annotations.Experimental.Kind

/** IO to read data on CosmosDB with Core (SQL) API.
*
* {{{
* pipeline.apply(
* CosmosIO
* .read()
* .withCosmosEndpoint(ENDPOINT)
* .withCosmosKey(KEY)
* .withDatabase(DATABASE_NAME)
* .withContainer(COLLECTIONS)
* .withQuery(FILTER)
* )
* }}}
*/
@Experimental(Kind.SOURCE_SINK)
@SuppressFBWarnings(Array("MS_PKGPROTECT"))
object CosmosIO {
def read(): CosmosRead = {
CosmosRead()
}
}

Original file line number Diff line number Diff line change
@@ -1,11 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.azure.cosmos

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import org.apache.beam.sdk.annotations.Experimental
import org.apache.beam.sdk.annotations.Experimental.Kind
import org.apache.beam.sdk.io.Read
import org.apache.beam.sdk.transforms.PTransform
import org.apache.beam.sdk.values.{PBegin, PCollection}
import org.apache.beam.sdk.values.{ PBegin, PCollection }
import org.bson.Document
import org.slf4j.LoggerFactory

/** A {@link PTransform} to read data from CosmosDB Core (SQL) API. */
@Experimental(Kind.SOURCE_SINK)
@SuppressFBWarnings(Array("MS_PKGPROTECT"))
case class CosmosRead(private[cosmos] val endpoint: String = null,
private[cosmos] val key: String = null,
private[cosmos] val database: String = null,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,8 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.azure.cosmos

import com.azure.cosmos.CosmosClientBuilder
import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.io.azure.cosmos.CosmosIOTest.{CONTAINER, DATABASE, cosmosDBEmulatorContainer}
import org.apache.beam.sdk.io.azure.cosmos.CosmosIOTest.{ CONTAINER, DATABASE, cosmosDBEmulatorContainer }
import org.apache.beam.sdk.testing.PAssert
import org.apache.beam.sdk.transforms.Count
import org.apache.beam.sdk.values.PCollection
Expand All @@ -11,20 +28,22 @@ import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.slf4j.LoggerFactory
import org.testcontainers.containers.CosmosDBEmulatorContainer
import org.testcontainers.utility.DockerImageName
import scribe._

import java.nio.file.Files
import scala.util.Using

@RunWith(classOf[JUnit4])
class CosmosIOTest {
private val log = LoggerFactory.getLogger("CosmosIOTest")
// @(Rule @getter)
// val pipelineWrite: TestPipeline = TestPipeline.create
// @(Rule @getter)
// val pipelineRead: TestPipeline = TestPipeline.create
Logger.root
.clearHandlers()
.clearModifiers()
.withHandler(
minimumLevel = Some(Level.Debug),
)
.replace()

@Test
def readFromCosmosCoreSqlApi(): Unit = {
Expand All @@ -50,15 +69,21 @@ class CosmosIOTest {
/** Initialization of static fields and methods */
@RunWith(classOf[JUnit4])
object CosmosIOTest {
private val log = LoggerFactory.getLogger("CosmosIOTest[Obj]")
Logger.root
.clearHandlers()
.clearModifiers()
.withHandler(
minimumLevel = Some(Level.Debug),
)
.replace()
private val DOCKER_NAME = "mcr.microsoft.com/cosmosdb/linux/azure-cosmos-emulator:latest"
private val cosmosDBEmulatorContainer = new CosmosDBEmulatorContainer(DockerImageName.parse(DOCKER_NAME))
private val DATABASE = "test"
private val CONTAINER = "test"

@BeforeClass
def setup(): Unit = {
log.info("Starting CosmosDB emulator")
info("Starting CosmosDB emulator")
cosmosDBEmulatorContainer.start()

val tempFolder = new TemporaryFolder
Expand All @@ -71,7 +96,7 @@ object CosmosIOTest {
System.setProperty("javax.net.ssl.trustStoreType", "PKCS12")


log.info("Creando la data -------------------------------------------------------->")
info("Creando la data -------------------------------------------------------->")
val triedCreateData = Using(new CosmosClientBuilder()
.gatewayMode
.endpointDiscoveryEnabled(false)
Expand All @@ -89,15 +114,15 @@ object CosmosIOTest {
}
if (triedCreateData.isFailure) {
val throwable = triedCreateData.failed.get
log.error("Error creando la data", throwable)
error("Error creando la data", throwable)
throw throwable
}
log.info("Data creada ------------------------------------------------------------<")
info("Data creada ------------------------------------------------------------<")
}

@AfterClass
def close(): Unit = {
log.info("Stop CosmosDB emulator")
info("Stop CosmosDB emulator")
cosmosDBEmulatorContainer.stop()
}
}

0 comments on commit ed2dfb7

Please sign in to comment.