Skip to content
This repository was archived by the owner on Jul 22, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ matrix:
- jdk: oraclejdk7
script: mvn test
- jdk: oraclejdk8
script:
- mvn test
- mvn verify
script: mvn test
- jdk: oraclejdk8
script: mvn verify
deploy:
provider: script
script: ./scripts/deploy.sh
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ https://github.com/builtamont/cassandra-migration/releases
## Version 0.9 Release Pending Actions

* Replace `config.Cluster.java` and `config.Keyspace.java` to the one provided by DataStax Cassandra driver
* Add additional features from upstream open PRs
* ~~Add additional features from upstream open PRs~~ (DONE as per 8 September 2016 PRs)
* Add standalone Cassandra (DataStax Community Edition) integration test

## Non-Critical Pending Actions
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,9 @@
<version>2.19.1</version>
<!-- NOTE: Configuration as per http://stackoverflow.com/a/33757854 -->
<configuration>
<forkCount>3</forkCount>
<forkCount>1</forkCount>
<reuseForks>true</reuseForks>
<argLine>-Xmx1024m -XX:MaxPermSize=256m</argLine>
<argLine>-Xmx2048m -XX:MaxPermSize=512m</argLine>
</configuration>
<executions>
<execution>
Expand Down
245 changes: 179 additions & 66 deletions src/main/java/com/builtamont/cassandra/migration/CassandraMigration.kt
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import com.builtamont.cassandra.migration.internal.util.logging.LogFactory
import com.datastax.driver.core.Cluster
import com.datastax.driver.core.Metadata
import com.datastax.driver.core.Session
import sun.reflect.generics.reflectiveObjects.NotImplementedException

/**
* This is the centre point of Cassandra migration, and for most users, the only class they will ever have to deal with.
Expand Down Expand Up @@ -87,24 +86,18 @@ class CassandraMigration : CassandraMigrationConfiguration {
* @return The number of successfully applied migrations.
*/
fun migrate(): Int {
return execute(object : Action<Int> {
override fun execute(session: Session): Int {
Initialize().run(session, keyspace, MigrationVersion.CURRENT.table)

val migrationResolver = createMigrationResolver()
val schemaVersionDAO = SchemaVersionDAO(session, keyspace, MigrationVersion.CURRENT.table)
val migrate = Migrate(
migrationResolver,
configs.target,
schemaVersionDAO,
session,
keyspace.cluster.username,
configs.isAllowOutOfOrder
)
return execute(migrateAction())
}

return migrate.run()
}
})
/**
* Starts the database migration. All pending migrations will be applied in order.
* Calling migrate on an up-to-date database has no effect.
*
* @param session The Cassandra connection session.
* @return The number of successfully applied migrations.
*/
fun migrate(session: Session): Int {
return execute(migrateAction(), session)
}

/**
Expand All @@ -114,16 +107,18 @@ class CassandraMigration : CassandraMigrationConfiguration {
* @return All migrations sorted by version, oldest first.
*/
fun info(): MigrationInfoService {
return execute(object : Action<MigrationInfoService> {
override fun execute(session: Session): MigrationInfoService {
val migrationResolver = createMigrationResolver()
val schemaVersionDAO = SchemaVersionDAO(session, keyspace, MigrationVersion.CURRENT.table)
val migrationInfoService = MigrationInfoServiceImpl(migrationResolver, schemaVersionDAO, configs.target, false, true)
migrationInfoService.refresh()
return execute(infoAction())
}

return migrationInfoService
}
})
/**
* Retrieves the complete information about all the migrations including applied, pending and current migrations with
* details and status.
*
* @param session The Cassandra connection session.
* @return All migrations sorted by version, oldest first.
*/
fun info(session: Session): MigrationInfoService {
return execute(infoAction(), session)
}

/**
Expand All @@ -136,32 +131,46 @@ class CassandraMigration : CassandraMigrationConfiguration {
* * versions have been resolved that haven't been applied yet
*/
fun validate() {
val validationError = execute(object : Action<String?> {
override fun execute(session: Session): String? {
val migrationResolver = createMigrationResolver()
val schemaVersionDAO = SchemaVersionDAO(session, keyspace, MigrationVersion.CURRENT.table)
val validate = Validate(migrationResolver, configs.target, schemaVersionDAO, true, false)
return validate.run()
}
})
val validationError = execute(validateAction())

if (validationError != null) {
throw CassandraMigrationException("Validation failed. $validationError")
}
}

/**
* Validate applied migrations against resolved ones (on the filesystem or classpath)
* to detect accidental changes that may prevent the schema(s) from being recreated exactly.
*
* Validation fails if:
* * differences in migration names, types or checksums are found
* * versions have been applied that aren't resolved locally anymore
* * versions have been resolved that haven't been applied yet
*
* @param session The Cassandra connection session.
*/
fun validate(session: Session) {
val validationError = execute(validateAction(), session)

if (validationError != null) {
throw CassandraMigrationException("Validation failed. " + validationError)
throw CassandraMigrationException("Validation failed. $validationError")
}
}

/**
* Baselines an existing database, excluding all migrations up to and including baselineVersion.
*/
fun baseline() {
execute(object : Action<Unit> {
override fun execute(session: Session): Unit {
val migrationResolver = createMigrationResolver()
val schemaVersionDAO = SchemaVersionDAO(session, keyspace, MigrationVersion.CURRENT.table)
val baseline = Baseline(migrationResolver, baselineVersion, schemaVersionDAO, baselineDescription, keyspace.cluster.username)
baseline.run()
}
})
execute(baselineAction())
}

/**
* Baselines an existing database, excluding all migrations up to and including baselineVersion.
*
* @param session The Cassandra connection session.
*/
fun baseline(session: Session) {
execute(baselineAction(), session)
}

/**
Expand All @@ -179,40 +188,37 @@ class CassandraMigration : CassandraMigrationConfiguration {
var cluster: Cluster? = null
var session: Session? = null
try {
if (null == keyspace)
throw IllegalArgumentException("Unable to establish Cassandra session. Keyspace is not configured.")

if (null == keyspace.cluster)
throw IllegalArgumentException("Unable to establish Cassandra session. Cluster is not configured.")
// Guard clauses: Cluster and Keyspace must be defined
val errorMsg = "Unable to establish Cassandra session"
if (keyspace == null) throw IllegalArgumentException("$errorMsg. Keyspace is not configured.")
if (keyspace.cluster == null) throw IllegalArgumentException("$errorMsg. Cluster is not configured.")
if (keyspace.name.isNullOrEmpty()) throw IllegalArgumentException("$errorMsg. Keyspace is not specified.")

// Build the Cluster
val builder = Cluster.Builder()
builder.addContactPoints(*keyspace.cluster.contactpoints).withPort(keyspace.cluster.port)
if (null != keyspace.cluster.username && !keyspace.cluster.username.trim { it <= ' ' }.isEmpty()) {
if (null != keyspace.cluster.password && !keyspace.cluster.password.trim { it <= ' ' }.isEmpty()) {
if (!keyspace.cluster.username.isNullOrBlank()) {
if (!keyspace.cluster.password.isNullOrBlank()) {
builder.withCredentials(keyspace.cluster.username, keyspace.cluster.password)
} else {
throw IllegalArgumentException("Password must be provided with username.")
}
}
cluster = builder.build()

val metadata = cluster!!.metadata
LOG.info(getConnectionInfo(metadata))
LOG.info(getConnectionInfo(cluster.metadata))

// Create a new Session
session = cluster.newSession()
if (null == keyspace.name || keyspace.name.trim { it <= ' ' }.length == 0)
throw IllegalArgumentException("Keyspace not specified.")

val keyspaces = metadata.keyspaces
var keyspaceExists = false
for (keyspaceMetadata in keyspaces) {
if (keyspaceMetadata.name.equals(keyspace.name, ignoreCase = true))
keyspaceExists = true

// Connect to the specific Keyspace context (if already defined)
val keyspaces = cluster.metadata.keyspaces.map { it.name }
val keyspaceExists = keyspaces.first { it.equals(keyspace.name, ignoreCase = true) }.isNotEmpty()
if (keyspaceExists) {
session = cluster.connect(keyspace.name)
} else {
throw CassandraMigrationException("Keyspace: ${keyspace.name} does not exist.")
}
if (keyspaceExists)
session!!.execute("USE " + keyspace.name)
else
throw CassandraMigrationException("Keyspace: " + keyspace.name + " does not exist.")

result = action.execute(session)
} finally {
Expand All @@ -229,11 +235,22 @@ class CassandraMigration : CassandraMigrationConfiguration {
} catch (e: Exception) {
LOG.warn("Error closing Cassandra cluster")
}

}
return result
}

/**
* Executes this command with an existing session, with proper resource handling and cleanup.
*
* @param action The action to execute.
* @param session The Cassandra connection session.
* @param T The action result type.
* @return The action result.
*/
internal fun <T> execute(action: Action<T>, session: Session): T {
return action.execute(session)
}

/**
* Get Cassandra connection information.
*
Expand Down Expand Up @@ -263,6 +280,102 @@ class CassandraMigration : CassandraMigrationConfiguration {
return CompositeMigrationResolver(classLoader, ScriptsLocations(*configs.scriptsLocations), configs.encoding)
}

/**
* Creates the SchemaVersionDAO.
*
* @return A configured SchemaVersionDAO instance.
*/
private fun createSchemaVersionDAO(session: Session): SchemaVersionDAO {
return SchemaVersionDAO(session, keyspace, MigrationVersion.CURRENT.table)
}

/**
* @return The database migration action.
*/
private fun migrateAction(): Action<Int> {
return object: Action<Int> {
override fun execute(session: Session): Int {
Initialize().run(session, keyspace, MigrationVersion.CURRENT.table)

val migrationResolver = createMigrationResolver()
val schemaVersionDAO = createSchemaVersionDAO(session)
val migrate = Migrate(
migrationResolver,
configs.target,
schemaVersionDAO,
session,
keyspace.cluster.username,
configs.isAllowOutOfOrder
)

return migrate.run()
}
}
}

/**
* @return The migration info service action.
*/
private fun infoAction(): Action<MigrationInfoService> {
return object : Action<MigrationInfoService> {
override fun execute(session: Session): MigrationInfoService {
val migrationResolver = createMigrationResolver()
val schemaVersionDAO = createSchemaVersionDAO(session)
val migrationInfoService = MigrationInfoServiceImpl(
migrationResolver,
schemaVersionDAO,
configs.target,
outOfOrder = false,
pendingOrFuture = true
)
migrationInfoService.refresh()

return migrationInfoService
}
}
}

/**
* @return The migration validation action.
*/
private fun validateAction(): Action<String?> {
return object : Action<String?> {
override fun execute(session: Session): String? {
val migrationResolver = createMigrationResolver()
val schemaVersionDAO = createSchemaVersionDAO(session)
val validate = Validate(
migrationResolver,
configs.target,
schemaVersionDAO,
outOfOrder = true,
pendingOrFuture = false
)

return validate.run()
}
}
}

/**
* @return The migration baselining action.
*/
private fun baselineAction(): Action<Unit> {
return object : Action<Unit> {
override fun execute(session: Session): Unit {
val migrationResolver = createMigrationResolver()
val schemaVersionDAO = createSchemaVersionDAO(session)
val baseline = Baseline(
migrationResolver,
baselineVersion,
schemaVersionDAO,
baselineDescription,
keyspace.cluster.username
)
baseline.run()
}
}
}

/**
* A Cassandra migration action that can be executed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ private int calculateInstalledRank() {

Select select = QueryBuilder
.select("count")
.from(tableName + COUNTS_TABLE_NAME_SUFFIX);
.from(keyspace.getName(), tableName + COUNTS_TABLE_NAME_SUFFIX);
select.where(eq("name", "installed_rank"));

select.setConsistencyLevel(this.consistencyLevel);
Expand Down
Loading