Skip to content

Commit a9e050d

Browse files
authored
KYLIN-5181 Support to use postgreSQL to store metadata
* KYLIN-5181, support to use postgreSQL to store metadata * minor fix * minor fix, remove useless comment * minor fix, same index name can not create in a schema * resolve confict
1 parent 63f9ac6 commit a9e050d

File tree

5 files changed

+80
-16
lines changed

5 files changed

+80
-16
lines changed

core-common/pom.xml

+6-2
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
to you under the Apache License, Version 2.0 (the
88
"License"); you may not use this file except in compliance
99
with the License. You may obtain a copy of the License at
10-
10+
1111
http://www.apache.org/licenses/LICENSE-2.0
12-
12+
1313
Unless required by applicable law or agreed to in writing, software
1414
distributed under the License is distributed on an "AS IS" BASIS,
1515
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -101,6 +101,10 @@
101101
<artifactId>mysql-connector-java</artifactId>
102102
<scope>provided</scope>
103103
</dependency>
104+
<dependency>
105+
<groupId>org.postgresql</groupId>
106+
<artifactId>postgresql</artifactId>
107+
</dependency>
104108

105109
<dependency>
106110
<groupId>org.mockito</groupId>

core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java

+24-10
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public class JDBCResourceStore extends PushdownResourceStore {
5050
private static final String META_TABLE_KEY = "META_TABLE_KEY";
5151
private static final String META_TABLE_TS = "META_TABLE_TS";
5252
private static final String META_TABLE_CONTENT = "META_TABLE_CONTENT";
53+
private static final String DIALECT_OF_PG = "postgresql";
5354
private static Logger logger = LoggerFactory.getLogger(JDBCResourceStore.class);
5455
private JDBCConnectionManager connectionManager;
5556

@@ -123,6 +124,9 @@ public void execute(Connection connection) throws SQLException {
123124

124125
try {
125126
String indexName = "IDX_" + META_TABLE_TS;
127+
if (DIALECT_OF_PG.equals(kylinConfig.getMetadataDialect())) {
128+
indexName += System.currentTimeMillis();
129+
}
126130
String createIndexSql = sqls.getCreateIndexSql(indexName, tableName, META_TABLE_TS);
127131
logger.info("Creating index: {}", createIndexSql);
128132
pstat = connection.prepareStatement(createIndexSql);
@@ -314,7 +318,13 @@ private InputStream getInputStream(String resPath, ResultSet rs) throws SQLExcep
314318
if (rs == null) {
315319
return null;
316320
}
317-
321+
if (DIALECT_OF_PG.equals(kylinConfig.getMetadataDialect())) {
322+
InputStream inputStream = rs.getBinaryStream(META_TABLE_CONTENT);
323+
if (inputStream == null) {
324+
return openPushdown(resPath);
325+
}
326+
return inputStream;
327+
}
318328
Blob blob = rs.getBlob(META_TABLE_CONTENT);
319329

320330
if (blob == null || blob.length() == 0) {
@@ -355,13 +365,13 @@ public void execute(Connection connection) throws SQLException, IOException {
355365
if (existing) {
356366
pstat = connection.prepareStatement(sqls.getReplaceSql());
357367
pstat.setLong(1, ts);
358-
pstat.setBlob(2, new BufferedInputStream(new ByteArrayInputStream(bytes)));
368+
pstat.setBinaryStream(2, new BufferedInputStream(new ByteArrayInputStream(bytes)));
359369
pstat.setString(3, resPath);
360370
} else {
361371
pstat = connection.prepareStatement(sqls.getInsertSql());
362372
pstat.setString(1, resPath);
363373
pstat.setLong(2, ts);
364-
pstat.setBlob(3, new BufferedInputStream(new ByteArrayInputStream(bytes)));
374+
pstat.setBinaryStream(3, new BufferedInputStream(new ByteArrayInputStream(bytes)));
365375
}
366376

367377
if (isContentOverflow(bytes, resPath)) {
@@ -376,8 +386,9 @@ public void execute(Connection connection) throws SQLException, IOException {
376386
RollbackablePushdown pushdown = writePushdown(resPath, ContentWriter.create(bytes));
377387
try {
378388
int result = pstat.executeUpdate();
379-
if (result != 1)
389+
if (result != 1) {
380390
throw new SQLException();
391+
}
381392
} catch (Exception e) {
382393
pushdown.rollback();
383394
throw e;
@@ -414,10 +425,11 @@ private boolean isContentOverflow(byte[] content, String resPath) throws SQLExce
414425
}
415426

416427
int maxSize = kylinConfig.getJdbcResourceStoreMaxCellSize();
417-
if (content.length > maxSize)
428+
if (content.length > maxSize) {
418429
return true;
419-
else
430+
} else {
420431
return false;
432+
}
421433
}
422434

423435
@Override
@@ -454,8 +466,9 @@ public void execute(Connection connection) throws SQLException, IOException {
454466
RollbackablePushdown pushdown = writePushdown(resPath, ContentWriter.create(content));
455467
try {
456468
int result = pstat.executeUpdate();
457-
if (result != 1)
469+
if (result != 1) {
458470
throw new SQLException();
471+
}
459472
} catch (Throwable e) {
460473
pushdown.rollback();
461474
throw e;
@@ -466,7 +479,7 @@ public void execute(Connection connection) throws SQLException, IOException {
466479
pstat = connection.prepareStatement(sqls.getInsertSql());
467480
pstat.setString(1, resPath);
468481
pstat.setLong(2, newTS);
469-
pstat.setBlob(3, new BufferedInputStream(new ByteArrayInputStream(content)));
482+
pstat.setBinaryStream(3, new BufferedInputStream(new ByteArrayInputStream(content)));
470483
pstat.executeUpdate();
471484
}
472485
} else {
@@ -481,8 +494,9 @@ public void execute(Connection connection) throws SQLException, IOException {
481494
RollbackablePushdown pushdown = writePushdown(resPath, ContentWriter.create(content));
482495
try {
483496
int result = pstat.executeUpdate();
484-
if (result != 1)
497+
if (result != 1) {
485498
throw new SQLException();
499+
}
486500
} catch (Throwable e) {
487501
pushdown.rollback();
488502
throw e;
@@ -647,4 +661,4 @@ abstract static class SqlOperation {
647661
abstract public void execute(final Connection connection) throws SQLException, IOException;
648662
}
649663

650-
}
664+
}

core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,9 @@ public static void main(String[] args) throws IOException {
7070
tool.addExcludes(exclude.split("\\s*,\\s*"));
7171
}
7272
String group = System.getProperty("group");
73-
if (group != null)
73+
if (group != null) {
7474
tool.parallelCopyGroupSize = Integer.parseInt(group);
75+
}
7576

7677
tool.addExcludes(IMMUTABLE_PREFIX.toArray(new String[IMMUTABLE_PREFIX.size()]));
7778

@@ -151,8 +152,9 @@ private void addExcludes(String[] arg) {
151152

152153
private void copyParallel(KylinConfig from, KylinConfig to, String folder) throws IOException {
153154
ResourceParallelCopier copier = new ResourceParallelCopier(ResourceStore.getStore(from), ResourceStore.getStore(to));
154-
if (parallelCopyGroupSize > 0)
155+
if (parallelCopyGroupSize > 0) {
155156
copier.setGroupSize(parallelCopyGroupSize);
157+
}
156158

157159
Stats stats = copier.copy(folder, includes, excludes, new Stats() {
158160

@@ -178,10 +180,12 @@ void onRetry(int errorResourceCnt) {
178180
});
179181

180182
if (stats.hasError()) {
181-
for (String errGroup : stats.errorGroups)
183+
for (String errGroup : stats.errorGroups) {
182184
System.out.println("Failed to copy resource group: " + errGroup + "*");
183-
for (String errResPath : stats.errorResourcePaths)
185+
}
186+
for (String errResPath : stats.errorResourcePaths) {
184187
System.out.println("Failed to copy resource: " + errResPath);
188+
}
185189
throw new IOException("Failed to copy " + stats.errorResource.get() + " resource");
186190
}
187191
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
###JDBC METASTORE
19+
format.sql.create-if-need=create table if not exists {0} ( {1} VARCHAR(255) primary key, {2} BIGINT, {3} BYTEA )
20+
format.sql.key-equals=select {0} from {1} where {2} = ?
21+
format.sql.delete-pstat=delete from {0} where {1} = ?
22+
format.sql.list-resource=select {0} from {1} where {2} like ?
23+
format.sql.all-resource=select {0} from {1} where {2} like ? escape ''#'' and {3} >= ? and {4} < ?
24+
format.sql.replace=update {0} set {1} = ?,{2} = ? where {3} = ?
25+
format.sql.insert=insert into {0}({1},{2},{3}) values(?,?,?)
26+
format.sql.replace-without-content=update {0} set {1} = ? where {2} = ?
27+
format.sql.insert-without-content=insert into {0}({1},{2}) values(?,?)
28+
format.sql.update-content-ts=update {0} set {1}=?,{2} = ? where {3}=? and {4}=?
29+
format.sql.test.create=create table if not exists {0} (name VARCHAR(255) primary key, id BIGINT)
30+
format.sql.test.drop=drop table if exists {0}
31+
format.sql.create-index=create index {0} on {1} ({2})
32+
format.sql.check-table-exists=SELECT table_name FROM information_schema.tables \
33+
WHERE table_schema='''public''' AND table_type='''BASE TABLE''' AND table_name=''{0}'';

pom.xml

+9
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,9 @@
8989
<!-- mysql versions -->
9090
<mysql-connector.version>5.1.8</mysql-connector.version>
9191

92+
<!-- postgre SQL versions -->
93+
<postgresql.version>42.3.3</postgresql.version>
94+
9295
<!-- Scala versions -->
9396
<scala.version>2.12.10</scala.version>
9497
<scala.binary.version>2.12</scala.binary.version>
@@ -593,6 +596,11 @@
593596
<version>${mysql-connector.version}</version>
594597
<scope>provided</scope>
595598
</dependency>
599+
<dependency>
600+
<groupId>org.postgresql</groupId>
601+
<artifactId>postgresql</artifactId>
602+
<version>${postgresql.version}</version>
603+
</dependency>
596604
<!-- Hive dependencies -->
597605
<dependency>
598606
<groupId>org.apache.hive</groupId>
@@ -1837,6 +1845,7 @@
18371845
</plugins>
18381846
</build>
18391847
</profile>
1848+
18401849
<profile>
18411850
<id>m2e-only</id>
18421851
<activation>

0 commit comments

Comments
 (0)