Skip to content

Commit

Permalink
Merge pull request #74 from codenotary/feature/sql-streaming
Browse files Browse the repository at this point in the history
Update schema.proto
  • Loading branch information
ostafen authored Jul 22, 2024
2 parents 75a4610 + 5bd47e1 commit 0c681c0
Show file tree
Hide file tree
Showing 13 changed files with 2,282 additions and 1,199 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
make dev
- name: Start immudb container
run: |
docker run -d --health-cmd "immuadmin status" --health-interval 10s --health-timeout 5s --health-retries 5 -v ${{ github.workspace }}/tests/certs/my.key.pem:/key.pem -p 3322:3322 codenotary/immudb:1.9DOM --signingKey=/key.pem
docker run -d --health-cmd "immuadmin status" --health-interval 10s --health-timeout 5s --health-retries 5 -v ${{ github.workspace }}/tests/certs/my.key.pem:/key.pem -p 3322:3322 codenotary/immudb:1.9.3 --signingKey=/key.pem
docker run -d --health-cmd "immuadmin status" --health-interval 10s --health-timeout 5s --health-retries 5 -v ${{ github.workspace }}/tests/certs/my.key.pem:/key.pem -p 3333:3322 codenotary/immudb:1.5.0 --signingKey=/key.pem
docker run -d --health-cmd "immuadmin status" --health-interval 10s --health-timeout 5s --health-retries 5 -v ${{ github.workspace }}/tests/certs/my.key.pem:/key.pem -p 3344:3322 codenotary/immudb:1.4.1 --signingKey=/key.pem
docker run -d --health-cmd "immuadmin status" --health-interval 10s --health-timeout 5s --health-retries 5 -v ${{ github.workspace }}/tests/certs/my.key.pem:/key.pem -p 3355:3322 codenotary/immudb:1.4.0 --signingKey=/key.pem
Expand Down
22 changes: 9 additions & 13 deletions immudb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ def openSession(self, username, password, database=b"defaultdb"):
session_response = self._stub.OpenSession(
req)
self._stub = self._set_session_id_interceptor(session_response)
return transaction.Tx(self._stub, session_response, self.channel)
return transaction.Tx(self._stub, database, session_response, self.channel)

def closeSession(self):
"""Closes unmanaged session
Expand Down Expand Up @@ -1613,7 +1613,7 @@ def sqlExec(self, stmt, params={}, noWait=False):

return sqlexec.call(self._stub, self._rs, stmt, params, noWait)

def sqlQuery(self, query, params={}, columnNameMode=constants.COLUMN_NAME_MODE_NONE):
def sqlQuery(self, query, params={}, columnNameMode=constants.COLUMN_NAME_MODE_NONE, acceptStream=False):
"""Queries the database using SQL
Args:
query: a query in immudb SQL dialect.
Expand All @@ -1624,17 +1624,12 @@ def sqlQuery(self, query, params={}, columnNameMode=constants.COLUMN_NAME_MODE_N
['table1', 'table2']
"""
ret = sqlquery.call(self._stub, self._rs, query,
params, columnNameMode)
if columnNameMode in [constants.COLUMN_NAME_MODE_DATABASE, constants.COLUMN_NAME_MODE_FULL]:
# newer DB version don't insert database name anymore, we need to
# process it manually
for i, t in enumerate(ret):
newkeys = [
x.replace("[@DB]", self._currentdb.decode("utf-8")) for x in t.keys()]
k = dict(zip(newkeys, list(t.values())))
ret[i] = k
return ret
it = sqlquery.call(self._stub, self._rs, query,
params, columnNameMode, self._currentdb)
if acceptStream:
return it

return list(it)

def listTables(self):
"""List all tables in the current database
Expand Down Expand Up @@ -1710,6 +1705,7 @@ def verifiableSQLGet(self, table: str, primaryKeys: List[datatypesv2.PrimaryKey]

# immudb-py only


def getAllValues(self, keys: list): # immudb-py only
resp = batchGet.call(self._stub, self._rs, keys)
return resp
Expand Down
147 changes: 116 additions & 31 deletions immudb/datatypesv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,10 @@ class ServerInfoRequest(GRPCTransformable):
@dataclass
class ServerInfoResponse(GRPCTransformable):
version: str = None
startedAt: int = None
numTransactions: int = None
numDatabases: int = None
databasesDiskSize: int = None


@dataclass
Expand Down Expand Up @@ -737,28 +741,74 @@ class DatabaseNullableSettings(GRPCTransformable):
syncFrequency: NullableMilliseconds = None
writeBufferSize: NullableUint32 = None
ahtSettings: AHTNullableSettings = None
maxActiveTransactions: NullableUint32 = None
mvccReadSetLimit: NullableUint32 = None
vLogCacheSize: NullableUint32 = None
truncationSettings: TruncationNullableSettings = None
embeddedValues: NullableBool = None
preallocFiles: NullableBool = None

def _getHumanDataClass(self):
return grpcHumanizator(self, DatabaseSettingsV2)


@dataclass
class TruncationNullableSettings(GRPCTransformable):
retentionPeriod: NullableMilliseconds = None
truncationFrequency: NullableMilliseconds = None

def _getHumanDataClass(self):
return grpcHumanizator(self, TruncationSettings)


@dataclass
class ReplicationSettings(GRPCTransformable):
replica: Optional[bool] = None
masterDatabase: Optional[str] = None
masterAddress: Optional[str] = None
masterPort: Optional[int] = None
followerUsername: Optional[str] = None
followerPassword: Optional[str] = None
primaryDatabase: Optional[str] = None
primaryHost: Optional[str] = None
primaryPort: Optional[int] = None
primaryUsername: Optional[str] = None
primaryPassword: Optional[str] = None
syncReplication: Optional[bool] = None
syncAcks: Optional[int] = None
prefetchTxBufferSize: Optional[int] = None
replicationCommitConcurrency: Optional[int] = None
allowTxDiscarding: Optional[bool] = None
skipIntegrityCheck: Optional[bool] = None
waitForIndexing: Optional[bool] = None

def _getGRPC(self):
return schema.ReplicationNullableSettings(
replica=NullableBool(self.replica)._getGRPC(),
masterDatabase=NullableString(self.masterDatabase)._getGRPC(),
masterAddress=NullableString(self.masterAddress)._getGRPC(),
masterPort=NullableUint32(self.masterPort)._getGRPC(),
followerUsername=NullableString(self.followerUsername)._getGRPC(),
followerPassword=NullableString(self.followerPassword)._getGRPC()
primaryDatabase=NullableString(self.primaryDatabase)._getGRPC(),
primaryHost=NullableString(self.primaryHost)._getGRPC(),
primaryPort=NullableUint32(self.primaryPort)._getGRPC(),
primaryUsername=NullableString(self.primaryUsername)._getGRPC(),
primaryPassword=NullableString(self.primaryUsername)._getGRPC(),
syncReplication=NullableBool(self.syncReplication)._getGRPC(),
syncAcks=NullableUint32(self.syncAcks)._getGRPC(),
prefetchTxBufferSize=NullableUint32(
self.prefetchTxBufferSize)._getGRPC(),
replicationCommitConcurrency=NullableUint32(
self.replicationCommitConcurrency)._getGRPC(),
allowTxDiscarding=NullableBool(self.allowTxDiscarding)._getGRPC(),
skipIntegrityCheck=NullableBool(
self.skipIntegrityCheck)._getGRPC(),
waitForIndexing=NullableBool(self.waitForIndexing)._getGRPC(),
)


@dataclass
class TruncationSettings(GRPCTransformable):
retentionPeriod: Optional[int]
truncationFrequency: Optional[int]

def _getGRPC(self):
return schema.TruncationNullableSettings(
retentionPeriod=NullableMilliseconds(
self.retentionPeriod)._getGRPC(),
truncationFrequency=NullableMilliseconds(
self.truncationFrequency)._getGRPC(),
)


Expand All @@ -777,6 +827,8 @@ class IndexSettings(GRPCTransformable):
commitLogMaxOpenedFiles: Optional[int] = None
flushBufferSize: Optional[int] = None
cleanupPercentage: Optional[float] = None
maxBulkSize: Optional[int] = None
bulkPreparationTimeout: Optional[int] = None

def _getGRPC(self):
return schema.IndexNullableSettings(
Expand All @@ -798,7 +850,10 @@ def _getGRPC(self):
commitLogMaxOpenedFiles=NullableUint32(
self.commitLogMaxOpenedFiles)._getGRPC(),
flushBufferSize=NullableUint32(self.flushBufferSize)._getGRPC(),
cleanupPercentage=NullableFloat(self.cleanupPercentage)._getGRPC()
cleanupPercentage=NullableFloat(self.cleanupPercentage)._getGRPC(),
maxBulkSize=NullableUint32(self.maxBulkSize)._getGRPC(),
bulkPreparationTimeout=NullableMilliseconds(
self.bulkPreparationTimeout)._getGRPC(),
)


Expand All @@ -817,36 +872,48 @@ def _getGRPC(self):
@dataclass
class DatabaseSettingsV2(GRPCTransformable):
replicationSettings: ReplicationSettings = None
fileSize: Optional[int] = None
maxKeyLen: Optional[int] = None
maxValueLen: Optional[int] = None
maxTxEntries: Optional[int] = None
excludeCommitTime: Optional[bool] = None
fileSize: Optional[int] = None
maxKeyLen: Optional[int] = None
maxValueLen: Optional[int] = None
maxTxEntries: Optional[int] = None
excludeCommitTime: Optional[bool] = None
maxConcurrency: Optional[int] = None
maxIOConcurrency: Optional[int] = None
txLogCacheSize: Optional[int] = None
vLogMaxOpenedFiles: Optional[int] = None
txLogMaxOpenedFiles: Optional[int] = None
commitLogMaxOpenedFiles: Optional[int] = None
indexSettings: IndexSettings = None
vLogMaxOpenedFiles: Optional[int] = None
txLogMaxOpenedFiles: Optional[int] = None
commitLogMaxOpenedFiles: Optional[int] = None
indexSettings: IndexSettings = None
writeTxHeaderVersion: Optional[int] = None
autoload: Optional[bool] = None
readTxPoolSize: Optional[int] = None
autoload: Optional[bool] = None
readTxPoolSize: Optional[int] = None
syncFrequency: NullableMilliseconds = None
writeBufferSize: Optional[int] = None
ahtSettings: AHTSettings = None
maxActiveTransactions: Optional[int] = None
mvccReadSetLimit: Optional[int] = None
vLogCacheSize: Optional[int] = None
truncationSettings: TruncationSettings = None
embeddedValues: Optional[bool] = None
preallocFiles: Optional[bool] = None

def _getGRPC(self):
indexSettings = None
if self.indexSettings != None:
indexSettings = self.indexSettings._getGRPC()

replicationSettings = None
if self.replicationSettings != None:
replicationSettings = self.replicationSettings._getGRPC()

ahtSettings = None
if self.ahtSettings != None:
ahtSettings = self.ahtSettings._getGRPC()

truncSettings = None
if self.truncationSettings != None:
truncSettings = self.truncationSettings._getGRPC()

return schema.DatabaseNullableSettings(
replicationSettings=replicationSettings,
fileSize=NullableUint32(self.fileSize)._getGRPC(),
Expand All @@ -870,18 +937,32 @@ def _getGRPC(self):
readTxPoolSize=NullableUint32(self.readTxPoolSize)._getGRPC(),
syncFrequency=NullableMilliseconds(self.syncFrequency)._getGRPC(),
writeBufferSize=NullableUint32(self.writeBufferSize)._getGRPC(),
ahtSettings=ahtSettings
ahtSettings=ahtSettings,
maxActiveTransactions=NullableUint32(
self.maxActiveTransactions)._getGRPC(),
mvccReadSetLimit=NullableUint32(self.mvccReadSetLimit)._getGRPC(),
vLogCacheSize=NullableUint32(self.vLogCacheSize)._getGRPC(),
truncationSettings=truncSettings,
embeddedValues=NullableBool(self.embeddedValues)._getGRPC(),
preallocFiles=NullableBool(self.preallocFiles)._getGRPC(),
)


@dataclass
class ReplicationNullableSettings(GRPCTransformable):
replica: NullableBool = None
masterDatabase: NullableString = None
masterAddress: NullableString = None
masterPort: NullableUint32 = None
followerUsername: NullableString = None
followerPassword: NullableString = None
primaryDatabase: NullableString = None
primaryHost: NullableString = None
primaryPort: NullableUint32 = None
primaryUsername: NullableString = None
primaryPassword: NullableString = None
syncReplication: NullableBool = None
syncAcks: NullableUint32 = None
prefetchTxBufferSize: NullableUint32 = None,
replicationCommitConcurrency: NullableUint32 = None,
allowTxDiscarding: NullableBool = None,
skipIntegrityCheck: NullableBool = None,
waitForIndexing: NullableBool = None,

def _getHumanDataClass(self):
return grpcHumanizator(self, ReplicationSettings)
Expand All @@ -902,6 +983,8 @@ class IndexNullableSettings(GRPCTransformable):
commitLogMaxOpenedFiles: NullableUint32 = None
flushBufferSize: NullableUint32 = None
cleanupPercentage: NullableFloat = None
maxBulkSize: NullableUint32 = None
bulkPreparationTimeout: NullableMilliseconds = None

def _getHumanDataClass(self):
return grpcHumanizator(self, IndexSettings)
Expand Down Expand Up @@ -1035,7 +1118,7 @@ class DatabaseListRequestV2(GRPCTransformable):

@dataclass
class DatabaseListResponseV2(GRPCTransformable):
databases: List[Union[DatabaseWithSettings, DatabaseWithSettingsV2]] = None
databases: List[Union[DatabaseWithSettings, DatabaseInfo]] = None

def _getHumanDataClass(self):
return DatabaseListResponseV2(databases=[toConvert._getHumanDataClass() for toConvert in self.databases])
Expand All @@ -1048,14 +1131,16 @@ class DatabaseWithSettings(GRPCTransformable):
loaded: bool = None

def _getHumanDataClass(self):
return grpcHumanizator(self, DatabaseWithSettingsV2)
return grpcHumanizator(self, DatabaseInfo)


@dataclass
class DatabaseWithSettingsV2(GRPCTransformable):
class DatabaseInfo(GRPCTransformable):
name: str = None
settings: DatabaseSettingsV2 = None
loaded: bool = None
diskSize: int = None
numTransactions: int = None


@dataclass
Expand Down
Loading

0 comments on commit 0c681c0

Please sign in to comment.