Skip to content

Commit

Permalink
Draft implementation of DynamoDB support
Browse files Browse the repository at this point in the history
  • Loading branch information
YuriyIvon committed Jul 16, 2023
1 parent ac3e746 commit b112794
Show file tree
Hide file tree
Showing 26 changed files with 1,019 additions and 7 deletions.
23 changes: 23 additions & 0 deletions src/DatabaseBenchmark/Common/ConnectionStringParser.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace DatabaseBenchmark.Common
{
public static class ConnectionStringParser
{
public static IDictionary<string, string> Parse(string connectionString, params string[] requiredKeys)
{
var keyValues = connectionString.Split(';')
.Select(part => part.Split('=', 2))
.Where(part => part.Length == 2)
.ToDictionary(sp => sp[0], sp => sp[1]);

foreach (var key in requiredKeys)
{
if (!keyValues.ContainsKey(key))
{
throw new InputArgumentException($"The required key \"{key}\" was not found in the connection string");
}
}

return keyValues;
}
}
}
1 change: 1 addition & 0 deletions src/DatabaseBenchmark/DatabaseBenchmark.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="AWSSDK.DynamoDBv2" Version="3.7.105.5" />
<PackageReference Include="CsvHelper" Version="30.0.1" />
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.35.1" />
<PackageReference Include="MonetDB" Version="2.1.2" />
Expand Down
4 changes: 3 additions & 1 deletion src/DatabaseBenchmark/Databases/DatabaseFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using DatabaseBenchmark.Databases.ClickHouse;
using DatabaseBenchmark.Databases.Common.Interfaces;
using DatabaseBenchmark.Databases.CosmosDb;
using DatabaseBenchmark.Databases.DynamoDb;
using DatabaseBenchmark.Databases.Elasticsearch;
using DatabaseBenchmark.Databases.MonetDb;
using DatabaseBenchmark.Databases.MongoDb;
Expand Down Expand Up @@ -34,7 +35,8 @@ public DatabaseFactory(IExecutionEnvironment environment, IOptionsProvider optio
["CosmosDb"] = (connectionString) => new CosmosDbDatabase(connectionString, environment, optionsProvider),
["ClickHouse"] = (connectionString) => new ClickHouseDatabase(connectionString, environment, optionsProvider),
["Oracle"] = (connectionString) => new OracleDatabase(connectionString, environment),
["Snowflake"] = (connectionString) => new SnowflakeDatabase(connectionString, environment)
["Snowflake"] = (connectionString) => new SnowflakeDatabase(connectionString, environment),
["DynamoDb"] = (connectionString) => new DynamoDbDatabase(connectionString, environment)
};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
using Amazon.DynamoDBv2.Model;
using DatabaseBenchmark.Common;
using DatabaseBenchmark.Model;

namespace DatabaseBenchmark.Databases.DynamoDb
{
public static class DynamoDbAttributeValueUtils
{
public static object FromAttributeValue(AttributeValue attributeValue)
{
if (attributeValue.NULL)
{
return null;
}
else if (attributeValue.IsBOOLSet)
{
return attributeValue.BOOL;
}
else if (attributeValue.N != null)
{
return attributeValue.N;
}
else
{
return attributeValue.S;
}
}

public static AttributeValue ToAttributeValue(ColumnType type, object value)
{
if (value == null)
{
return new AttributeValue { NULL = true };
}
else
{
return type switch
{
ColumnType.Boolean => new AttributeValue { BOOL = (bool)value, IsBOOLSet = true },
ColumnType.Double => new AttributeValue { N = value.ToString() },
ColumnType.Integer => new AttributeValue { N = value.ToString() },
ColumnType.Long => new AttributeValue { N = value.ToString() },
ColumnType.Text => new AttributeValue(value.ToString()),
ColumnType.String => new AttributeValue(value.ToString()),
ColumnType.DateTime => new AttributeValue(((DateTime)value).ToString("s")),
ColumnType.Guid => new AttributeValue(value.ToString()),
ColumnType.Json => new AttributeValue(value.ToString()),
_ => throw new InputArgumentException($"Unknown column type \"{type}\"")
};
}
}
}
}
9 changes: 9 additions & 0 deletions src/DatabaseBenchmark/Databases/DynamoDb/DynamoDbConstants.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace DatabaseBenchmark.Databases.DynamoDb
{
public class DynamoDbConstants
{
public const string CapacityUnitsMetric = "CU";
public const string DummyPartitionKeyName = "partitionKey";
public const string DummyPartitionKeyValue = "A";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using Amazon.DynamoDBv2;
using Amazon.DynamoDBv2.Model;
using DatabaseBenchmark.Databases.Common.Interfaces;
using DatabaseBenchmark.Model;

namespace DatabaseBenchmark.Databases.DynamoDb
{
public class DynamoDbDataMetricsProvider : IDataMetricsProvider
{
private readonly AmazonDynamoDBClient _client;
private readonly Table _table;

public long GetRowCount()
{
var response = _client.DescribeTableAsync(
new DescribeTableRequest { TableName = _table.Name }).Result;

//TODO: find a different way since it doesn't return the actual number
return response.Table.ItemCount;
}

public IDictionary<string, double> GetMetrics() => null;

public DynamoDbDataMetricsProvider(AmazonDynamoDBClient client, Table table)
{
_client = client;
_table = table;
}
}
}
121 changes: 121 additions & 0 deletions src/DatabaseBenchmark/Databases/DynamoDb/DynamoDbDatabase.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
using Amazon;
using Amazon.DynamoDBv2;
using Amazon.DynamoDBv2.Model;
using DatabaseBenchmark.Common;
using DatabaseBenchmark.Core.Interfaces;
using DatabaseBenchmark.Databases.Common;
using DatabaseBenchmark.Databases.Common.Interfaces;
using DatabaseBenchmark.Databases.DynamoDb.Interfaces;
using DatabaseBenchmark.DataSources.Interfaces;
using DatabaseBenchmark.Model;

namespace DatabaseBenchmark.Databases.DynamoDb
{
public class DynamoDbDatabase : IDatabase
{
private const int DefaultImportBatchSize = 25;

private const string AccessKeyIdConnectionStringProperty = "AccessKeyId";
private const string SecretAccessKeyConnectionStringProperty = "SecretAccessKey";
private const string RegionEndpointConnectionStringProperty = "RegionEndpoint";

private readonly string _connectionString;
private readonly IExecutionEnvironment _environment;

public DynamoDbDatabase(string connectionString, IExecutionEnvironment environment)
{
_connectionString = connectionString;
_environment = environment;
}

public void CreateTable(Table table, bool dropExisting)
{
if (table.Columns.Any(c => c.DatabaseGenerated))
{
_environment.WriteLine("WARNING: DynamoDB doesn't support database-generated columns");
}

var client = CreateClient();

if (dropExisting)
{
DropTable(client, table.Name);
}

var tableBuilder = new DynamoDbTableBuilder();
var createTableRequest = tableBuilder.Build(table);
var createResponse = client.CreateTableAsync(createTableRequest).Result;

//TODO: handle result
}
public IDataImporter CreateDataImporter(Table table, IDataSource source, int batchSize) =>
new DataImporterBuilder(table, source, batchSize, DefaultImportBatchSize)
.TransactionProvider<NoTransactionProvider>()
.InsertBuilder<IDynamoDbInsertBuilder, DynamoDbInsertBuilder>()
.InsertExecutor<DynamoDbInsertExecutor>()
.DataMetricsProvider<DynamoDbDataMetricsProvider>()
.ProgressReporter<ImportProgressReporter>()
.Environment(_environment)
.Customize((container, lifestyle) =>
{
container.Register<AmazonDynamoDBClient>(CreateClient, lifestyle);
})
.Build();

public IQueryExecutorFactory CreateQueryExecutorFactory(Table table, Query query) =>
new DynamoDbQueryExecutorFactory(CreateClient, table, query, _environment);

public IQueryExecutorFactory CreateRawQueryExecutorFactory(RawQuery query) =>
new DynamoDbRawQueryExecutorFactory(CreateClient, query, _environment);

public IQueryExecutorFactory CreateInsertExecutorFactory(Table table, IDataSource dataSource, int batchSize) =>
new DynamoDbInsertExecutorFactory(CreateClient, table, dataSource, batchSize, _environment);

private AmazonDynamoDBClient CreateClient()
{
var connectionParameters = ConnectionStringParser.Parse(
_connectionString,
AccessKeyIdConnectionStringProperty,
SecretAccessKeyConnectionStringProperty,
RegionEndpointConnectionStringProperty);

var clientConfig = new AmazonDynamoDBConfig
{
RegionEndpoint = RegionEndpoint.GetBySystemName(connectionParameters[RegionEndpointConnectionStringProperty])
};

return new AmazonDynamoDBClient(
connectionParameters[AccessKeyIdConnectionStringProperty],
connectionParameters[SecretAccessKeyConnectionStringProperty],
clientConfig);
}

private static void DropTable(AmazonDynamoDBClient client, string tableName)
{
bool isTableDeleted = false;

try
{
var deleteResponse = client.DeleteTableAsync(
new DeleteTableRequest { TableName = tableName }).GetAwaiter().GetResult();
}
catch (ResourceNotFoundException)
{
isTableDeleted = true;
}

while (!isTableDeleted)
{
try
{
var response = client.DescribeTableAsync(tableName).GetAwaiter().GetResult();
Thread.Sleep(1000);
}
catch (ResourceNotFoundException)
{
isTableDeleted = true;
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using DatabaseBenchmark.Core.Interfaces;

namespace DatabaseBenchmark.Databases.DynamoDb
{
public class DynamoDbDistinctValuesProvider : IDistinctValuesProvider
{
public object[] GetDistinctValues(string tableName, string columnName)
{
throw new NotImplementedException("DynamoDB doesn't support distinct queries, please specify the list of possible query parameter values explicitly");
}
}
}
53 changes: 53 additions & 0 deletions src/DatabaseBenchmark/Databases/DynamoDb/DynamoDbInsertBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
using Amazon.DynamoDBv2.Model;
using DatabaseBenchmark.Databases.Common;
using DatabaseBenchmark.Databases.Common.Interfaces;
using DatabaseBenchmark.Databases.DynamoDb.Interfaces;
using DatabaseBenchmark.Model;

namespace DatabaseBenchmark.Databases.DynamoDb
{
public class DynamoDbInsertBuilder : IDynamoDbInsertBuilder
{
private readonly Table _table;
private readonly IDataSourceReader _sourceReader;
private readonly InsertBuilderOptions _options;

public int BatchSize => _options.BatchSize;

public string PartitionKeyName { get; }

public DynamoDbInsertBuilder(
Table table,
IDataSourceReader sourceReader,
InsertBuilderOptions options)
{
_table = table;
_sourceReader = sourceReader;
_options = options;

PartitionKeyName = table.GetPartitionKeyName();
}

public IEnumerable<Dictionary<string, AttributeValue>> Build()
{
var batch = new List<Dictionary<string, AttributeValue>>();

while (batch.Count < BatchSize && _sourceReader.ReadArray(_table.Columns, out var values))
{
var item = _table.Columns
.Where(c => !c.DatabaseGenerated)
.Select((c, i) => (c.Name, Value: DynamoDbAttributeValueUtils.ToAttributeValue(c.Type, values[i])))
.ToDictionary(t => t.Name, t => t.Value);

if (PartitionKeyName == null)
{
item.Add(DynamoDbConstants.DummyPartitionKeyName, new AttributeValue(DynamoDbConstants.DummyPartitionKeyValue));
}

batch.Add(item);
}

return batch;
}
}
}
Loading

0 comments on commit b112794

Please sign in to comment.