19
19
package org .apache .kylin .source .hive ;
20
20
21
21
import java .io .IOException ;
22
- import java .util .Map ;
23
22
import java .util .Set ;
24
23
25
24
import org .apache .commons .lang .StringUtils ;
@@ -127,51 +126,58 @@ public String[] parseMapperInput(Object mapperInput) {
127
126
128
127
public static class BatchCubingInputSide implements IMRBatchCubingInputSide {
129
128
130
- JobEngineConfig conf ;
131
129
final IJoinedFlatTableDesc flatDesc ;
130
+ final String flatTableDatabase ;
131
+ final String hdfsWorkingDir ;
132
+
132
133
String hiveViewIntermediateTables = "" ;
133
134
134
135
public BatchCubingInputSide (IJoinedFlatTableDesc flatDesc ) {
136
+ KylinConfig config = KylinConfig .getInstanceFromEnv ();
135
137
this .flatDesc = flatDesc ;
138
+ this .flatTableDatabase = config .getHiveDatabaseForIntermediateTable ();
139
+ this .hdfsWorkingDir = config .getHdfsWorkingDirectory ();
136
140
}
137
141
138
142
@ Override
139
143
public void addStepPhase1_CreateFlatTable (DefaultChainedExecutable jobFlow ) {
140
144
final String cubeName = CubingExecutableUtil .getCubeName (jobFlow .getParams ());
141
- final KylinConfig kylinConfig = CubeManager .getInstance (KylinConfig .getInstanceFromEnv ()).getCube (cubeName ).getConfig ();
142
- this .conf = new JobEngineConfig (kylinConfig );
145
+ final KylinConfig cubeConfig = CubeManager .getInstance (KylinConfig .getInstanceFromEnv ()).getCube (cubeName ).getConfig ();
146
+ JobEngineConfig conf = new JobEngineConfig (cubeConfig );
147
+
148
+ final String hiveInitStatements = JoinedFlatTable .generateHiveInitStatements (
149
+ flatTableDatabase , conf .getHiveConfFilePath (), cubeConfig .getHiveConfigOverride ()
150
+ ) ;
151
+ final String jobWorkingDir = getJobWorkingDir (jobFlow );
143
152
144
153
// create flat table first, then count and redistribute
145
- jobFlow .addTask (createFlatHiveTableStep (conf , flatDesc , jobFlow . getId () , cubeName ));
146
- if (kylinConfig .isHiveRedistributeEnabled () == true ) {
147
- jobFlow .addTask (createRedistributeFlatHiveTableStep (conf , flatDesc , jobFlow . getId () , cubeName ));
154
+ jobFlow .addTask (createFlatHiveTableStep (hiveInitStatements , jobWorkingDir , cubeName ));
155
+ if (cubeConfig .isHiveRedistributeEnabled () == true ) {
156
+ jobFlow .addTask (createRedistributeFlatHiveTableStep (hiveInitStatements , cubeName ));
148
157
}
149
- AbstractExecutable task = createLookupHiveViewMaterializationStep (jobFlow . getId () );
158
+ AbstractExecutable task = createLookupHiveViewMaterializationStep (hiveInitStatements , jobWorkingDir );
150
159
if (task != null ) {
151
160
jobFlow .addTask (task );
152
161
}
153
162
}
154
163
155
- public static AbstractExecutable createRedistributeFlatHiveTableStep (JobEngineConfig conf , IJoinedFlatTableDesc flatTableDesc , String jobId , String cubeName ) {
156
- StringBuilder hiveInitBuf = new StringBuilder ();
157
- hiveInitBuf .append ("USE " ).append (conf .getConfig ().getHiveDatabaseForIntermediateTable ()).append (";\n " );
158
- hiveInitBuf .append (JoinedFlatTable .generateHiveSetStatements (conf ));
159
- final KylinConfig kylinConfig = ((CubeSegment ) flatTableDesc .getSegment ()).getConfig ();
160
- appendHiveOverrideProperties (kylinConfig , hiveInitBuf );
164
+ private String getJobWorkingDir (DefaultChainedExecutable jobFlow ) {
165
+ return JobBuilderSupport .getJobWorkingDir (hdfsWorkingDir , jobFlow .getId ());
166
+ }
161
167
168
+ private AbstractExecutable createRedistributeFlatHiveTableStep (String hiveInitStatements , String cubeName ) {
162
169
RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep ();
163
- step .setInitStatement (hiveInitBuf . toString () );
164
- step .setIntermediateTable (flatTableDesc .getTableName ());
165
- step .setRedistributeDataStatement (JoinedFlatTable .generateRedistributeFlatTableStatement (flatTableDesc ));
170
+ step .setInitStatement (hiveInitStatements );
171
+ step .setIntermediateTable (flatDesc .getTableName ());
172
+ step .setRedistributeDataStatement (JoinedFlatTable .generateRedistributeFlatTableStatement (flatDesc ));
166
173
CubingExecutableUtil .setCubeName (cubeName , step .getParams ());
167
174
step .setName (ExecutableConstants .STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE );
168
175
return step ;
169
176
}
170
177
171
- public ShellExecutable createLookupHiveViewMaterializationStep (String jobId ) {
178
+ private ShellExecutable createLookupHiveViewMaterializationStep (String hiveInitStatements , String jobWorkingDir ) {
172
179
ShellExecutable step = new ShellExecutable ();
173
180
step .setName (ExecutableConstants .STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP );
174
- HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder ();
175
181
176
182
KylinConfig kylinConfig = ((CubeSegment ) flatDesc .getSegment ()).getConfig ();
177
183
MetadataManager metadataManager = MetadataManager .getInstance (kylinConfig );
@@ -187,16 +193,15 @@ public ShellExecutable createLookupHiveViewMaterializationStep(String jobId) {
187
193
if (lookupViewsTables .size () == 0 ) {
188
194
return null ;
189
195
}
190
- appendHiveOverrideProperties2 (kylinConfig , hiveCmdBuilder );
191
- final String useDatabaseHql = "USE " + conf .getConfig ().getHiveDatabaseForIntermediateTable () + ";" ;
192
- hiveCmdBuilder .addStatement (useDatabaseHql );
193
- hiveCmdBuilder .addStatement (JoinedFlatTable .generateHiveSetStatements (conf ));
196
+
197
+ HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder ();
198
+ hiveCmdBuilder .addStatement (hiveInitStatements );
194
199
for (TableDesc lookUpTableDesc : lookupViewsTables ) {
195
200
if (lookUpTableDesc .isView ()) {
196
201
StringBuilder createIntermediateTableHql = new StringBuilder ();
197
202
createIntermediateTableHql .append ("DROP TABLE IF EXISTS " + lookUpTableDesc .getMaterializedName () + ";\n " );
198
203
createIntermediateTableHql .append ("CREATE TABLE IF NOT EXISTS " + lookUpTableDesc .getMaterializedName () + "\n " );
199
- createIntermediateTableHql .append ("LOCATION '" + JobBuilderSupport . getJobWorkingDir ( conf , jobId ) + "/" + lookUpTableDesc .getMaterializedName () + "'\n " );
204
+ createIntermediateTableHql .append ("LOCATION '" + jobWorkingDir + "/" + lookUpTableDesc .getMaterializedName () + "'\n " );
200
205
createIntermediateTableHql .append ("AS SELECT * FROM " + lookUpTableDesc .getIdentity () + ";\n " );
201
206
hiveCmdBuilder .addStatement (createIntermediateTableHql .toString ());
202
207
hiveViewIntermediateTables = hiveViewIntermediateTables + lookUpTableDesc .getMaterializedName () + ";" ;
@@ -209,30 +214,27 @@ public ShellExecutable createLookupHiveViewMaterializationStep(String jobId) {
209
214
return step ;
210
215
}
211
216
212
- public static AbstractExecutable createFlatHiveTableStep (JobEngineConfig conf , IJoinedFlatTableDesc flatTableDesc , String jobId , String cubeName ) {
213
- StringBuilder hiveInitBuf = new StringBuilder ();
214
- hiveInitBuf .append (JoinedFlatTable .generateHiveSetStatements (conf ));
215
- final KylinConfig kylinConfig = ((CubeSegment ) flatTableDesc .getSegment ()).getConfig ();
216
- appendHiveOverrideProperties (kylinConfig , hiveInitBuf );
217
- final String useDatabaseHql = "USE " + conf .getConfig ().getHiveDatabaseForIntermediateTable () + ";\n " ;
218
- final String dropTableHql = JoinedFlatTable .generateDropTableStatement (flatTableDesc );
219
- final String createTableHql = JoinedFlatTable .generateCreateTableStatement (flatTableDesc , JobBuilderSupport .getJobWorkingDir (conf , jobId ));
220
- String insertDataHqls = JoinedFlatTable .generateInsertDataStatement (flatTableDesc , conf );
217
+ private AbstractExecutable createFlatHiveTableStep (String hiveInitStatements , String jobWorkingDir , String cubeName ) {
218
+ final String dropTableHql = JoinedFlatTable .generateDropTableStatement (flatDesc );
219
+ final String createTableHql = JoinedFlatTable .generateCreateTableStatement (flatDesc , jobWorkingDir );
220
+ String insertDataHqls = JoinedFlatTable .generateInsertDataStatement (flatDesc );
221
221
222
222
CreateFlatHiveTableStep step = new CreateFlatHiveTableStep ();
223
- step .setInitStatement (hiveInitBuf . toString () );
224
- step .setCreateTableStatement (useDatabaseHql + dropTableHql + createTableHql + insertDataHqls );
223
+ step .setInitStatement (hiveInitStatements );
224
+ step .setCreateTableStatement (dropTableHql + createTableHql + insertDataHqls );
225
225
CubingExecutableUtil .setCubeName (cubeName , step .getParams ());
226
226
step .setName (ExecutableConstants .STEP_NAME_CREATE_FLAT_HIVE_TABLE );
227
227
return step ;
228
228
}
229
229
230
230
@ Override
231
231
public void addStepPhase4_Cleanup (DefaultChainedExecutable jobFlow ) {
232
+ final String jobWorkingDir = getJobWorkingDir (jobFlow );
233
+
232
234
GarbageCollectionStep step = new GarbageCollectionStep ();
233
235
step .setName (ExecutableConstants .STEP_NAME_HIVE_CLEANUP );
234
236
step .setIntermediateTableIdentity (getIntermediateTableIdentity ());
235
- step .setExternalDataPath (JoinedFlatTable .getTableDir (flatDesc , JobBuilderSupport . getJobWorkingDir ( conf , jobFlow . getId ()) ));
237
+ step .setExternalDataPath (JoinedFlatTable .getTableDir (flatDesc , jobWorkingDir ));
236
238
step .setHiveViewIntermediateTableIdentities (hiveViewIntermediateTables );
237
239
jobFlow .addTask (step );
238
240
}
@@ -243,7 +245,7 @@ public IMRTableInputFormat getFlatTableInputFormat() {
243
245
}
244
246
245
247
private String getIntermediateTableIdentity () {
246
- return conf . getConfig (). getHiveDatabaseForIntermediateTable () + "." + flatDesc .getTableName ();
248
+ return flatTableDatabase + "." + flatDesc .getTableName ();
247
249
}
248
250
}
249
251
@@ -413,24 +415,5 @@ private String getExternalDataPath() {
413
415
public void setHiveViewIntermediateTableIdentities (String tableIdentities ) {
414
416
setParam ("oldHiveViewIntermediateTables" , tableIdentities );
415
417
}
416
-
417
- }
418
-
419
- private static void appendHiveOverrideProperties (final KylinConfig kylinConfig , StringBuilder hiveCmd ) {
420
- final Map <String , String > hiveConfOverride = kylinConfig .getHiveConfigOverride ();
421
- if (hiveConfOverride .isEmpty () == false ) {
422
- for (String key : hiveConfOverride .keySet ()) {
423
- hiveCmd .append ("SET " ).append (key ).append ("=" ).append (hiveConfOverride .get (key )).append (";\n " );
424
- }
425
- }
426
- }
427
-
428
- private static void appendHiveOverrideProperties2 (final KylinConfig kylinConfig , HiveCmdBuilder hiveCmdBuilder ) {
429
- final Map <String , String > hiveConfOverride = kylinConfig .getHiveConfigOverride ();
430
- if (hiveConfOverride .isEmpty () == false ) {
431
- for (String key : hiveConfOverride .keySet ()) {
432
- hiveCmdBuilder .addStatement ("SET " + key + "=" + hiveConfOverride .get (key ) + ";\n " );
433
- }
434
- }
435
418
}
436
419
}
0 commit comments