4141import  java .util .LinkedHashMap ;
4242import  java .util .List ;
4343import  java .util .Map ;
44+ import  java .util .Properties ;
4445import  java .util .Set ;
4546import  java .util .Stack ;
4647import  java .util .concurrent .TimeUnit ;
105106import  org .apache .hadoop .hive .ql .plan .MapWork ;
106107import  org .apache .hadoop .hive .ql .plan .MergeJoinWork ;
107108import  org .apache .hadoop .hive .ql .plan .OperatorDesc ;
109+ import  org .apache .hadoop .hive .ql .plan .PartitionDesc ;
108110import  org .apache .hadoop .hive .ql .plan .ReduceWork ;
109111import  org .apache .hadoop .hive .ql .plan .TableDesc ;
110112import  org .apache .hadoop .hive .ql .plan .TezEdgeProperty ;
133135import  org .apache .hadoop .yarn .api .records .URL ;
134136import  org .apache .hadoop .yarn .util .ConverterUtils ;
135137import  org .apache .hadoop .yarn .util .Records ;
138+ import  org .apache .kafka .clients .CommonClientConfigs ;
139+ import  org .apache .kafka .common .security .auth .SecurityProtocol ;
136140import  org .apache .tez .common .TezUtils ;
137141import  org .apache .tez .dag .api .DAG ;
138142import  org .apache .tez .dag .api .DataSinkDescriptor ;
@@ -179,13 +183,29 @@ public class DagUtils {
179183  public  static  final  String  TEZ_TMP_DIR_KEY  = "_hive_tez_tmp_dir" ;
180184  private  static  final  Logger  LOG  = LoggerFactory .getLogger (DagUtils .class .getName ());
181185  private  static  final  String  TEZ_DIR  = "_tez_scratch_dir" ;
182-   private  static  final  DagUtils  instance  = new  DagUtils (defaultCredentialSuppliers () );
186+   private  static  final  DagUtils  instance  = new  DagUtils ();
183187  // The merge file being currently processed. 
184188  public  static  final  String  TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX  =
185189      "hive.tez.current.merge.file.prefix" ;
186190  // A comma separated list of work names used as prefix. 
187191  public  static  final  String  TEZ_MERGE_WORK_FILE_PREFIXES  = "hive.tez.merge.file.prefixes" ;
188-   private  final  List <DagCredentialSupplier > credentialSuppliers ;
192+   private  static  final  List <DagCredentialSupplier > credentialSuppliers  = new  ArrayList <>();
193+ 
194+   /** 
195+    * MANDATORY Table property indicating kafka broker(s) connection string. 
196+    */ 
197+   private  static  final  String  HIVE_KAFKA_BOOTSTRAP_SERVERS  = "kafka.bootstrap.servers" ;
198+ 
199+   /** 
200+    * Table property prefix used to inject kafka consumer properties, e.g "kafka.consumer.max.poll.records" = "5000" 
201+    * this will lead to inject max.poll.records=5000 to the Kafka Consumer. NOT MANDATORY defaults to nothing 
202+    */ 
203+   private  static  final  String  CONSUMER_CONFIGURATION_PREFIX  = "kafka.consumer" ;
204+ 
205+   /** 
206+    * Table property prefix used to inject kafka producer properties, e.g "kafka.producer.lingers.ms" = "100". 
207+    */ 
208+   private  static  final  String  PRODUCER_CONFIGURATION_PREFIX  = "kafka.producer" ;
189209  /** 
190210   * Notifiers to synchronize resource localization across threads. If one thread is localizing 
191211   * a file, other threads can wait on the corresponding notifier object instead of just sleeping 
@@ -286,6 +306,23 @@ private void getCredentialsFromSuppliers(BaseWork work, Set<TableDesc> tables, D
286306    if  (!UserGroupInformation .isSecurityEnabled ()){
287307      return ;
288308    }
309+     Map <String , PartitionDesc > partitions  = ((MapWork ) work ).getAliasToPartnInfo ();
310+ 
311+     // We don't need to iterate on all partitions, and check the same TableDesc. 
312+     PartitionDesc  partition  = partitions .values ().stream ().findFirst ().orElse (null );
313+     if  (partition  != null ) {
314+       TableDesc  tableDesc  = partition .getTableDesc ();
315+       if  (isTokenRequired (tableDesc )) {
316+         addCredentialSuppliers (Collections .singletonList ("org.apache.hadoop.hive.kafka.KafkaDagCredentialSupplier" ));
317+       }
318+     }
319+ 
320+     for  (TableDesc  tableDesc  : tables ) {
321+       if  (isTokenRequired (tableDesc )) {
322+         addCredentialSuppliers (Collections .singletonList ("org.apache.hadoop.hive.kafka.KafkaDagCredentialSupplier" ));
323+         break ;
324+       }
325+     }
289326    for  (DagCredentialSupplier  supplier  : credentialSuppliers ) {
290327      Text  alias  = supplier .getTokenAlias ();
291328      Token <?> t  = dag .getCredentials ().getToken (alias );
@@ -300,11 +337,10 @@ private void getCredentialsFromSuppliers(BaseWork work, Set<TableDesc> tables, D
300337    }
301338  }
302339
303-   private  static  List <DagCredentialSupplier > defaultCredentialSuppliers () {
340+   @ VisibleForTesting 
341+   static  void  addCredentialSuppliers (List <String > supplierClassNames ) {
304342    // Class names of credential providers that should be used when adding credentials to the dag. 
305343    // Use plain strings instead of {@link Class#getName()} to avoid compile scope dependencies to other modules. 
306-     List <String > supplierClassNames  =
307-         Collections .singletonList ("org.apache.hadoop.hive.kafka.KafkaDagCredentialSupplier" );
308344    List <DagCredentialSupplier > dagSuppliers  = new  ArrayList <>();
309345    for  (String  s  : supplierClassNames ) {
310346      try  {
@@ -314,7 +350,46 @@ private static List<DagCredentialSupplier> defaultCredentialSuppliers() {
314350        LOG .error ("Failed to add credential supplier" , e );
315351      }
316352    }
317-     return  dagSuppliers ;
353+     credentialSuppliers .addAll (dagSuppliers );
354+   }
355+ 
356+   /** 
357+    * Returns the security protocol if one is defined in the properties and null otherwise. 
358+    * <p>The following properties are examined to determine the protocol:</p> 
359+    * <ol> 
360+    *   <li>security.protocol</li> 
361+    *   <li>kafka.consumer.security.protocol</li> 
362+    *   <li>kafka.producer.security.protocol</li> 
363+    * </ol> 
364+    * <p>and the first non null/not empty is returned.</p> 
365+    * <p>Defining multiple security protocols at the same time is invalid but this method is lenient and tries to pick 
366+    * the most reasonable option.</p> 
367+    * @param props the properties from which to obtain the protocol. 
368+    * @return the security protocol if one is defined in the properties and null otherwise. 
369+    */ 
370+   static  SecurityProtocol  getSecurityProtocol (Properties  props ) {
371+     String [] securityProtocolConfigs  = new  String [] { CommonClientConfigs .SECURITY_PROTOCOL_CONFIG ,
372+             CONSUMER_CONFIGURATION_PREFIX  + "."  + CommonClientConfigs .SECURITY_PROTOCOL_CONFIG ,
373+             PRODUCER_CONFIGURATION_PREFIX  + "."  + CommonClientConfigs .SECURITY_PROTOCOL_CONFIG  };
374+     for  (String  c  : securityProtocolConfigs ) {
375+       String  v  = props .getProperty (c );
376+       if  (v  != null  && !v .isEmpty ()) {
377+         return  SecurityProtocol .forName (v );
378+       }
379+     }
380+     return  null ;
381+   }
382+ 
383+   /** 
384+    * Returns whether a Kafka token is required for performing operations on the specified table. 
385+    * If "security.protocol" is set to "PLAINTEXT", we don't need to collect delegation token at all. 
386+    * 
387+    * @return true if a Kafka token is required for performing operations on the specified table and false otherwise. 
388+    */ 
389+   private  boolean  isTokenRequired (TableDesc  tableDesc ) {
390+     String  kafkaBrokers  = tableDesc .getProperties ().getProperty (HIVE_KAFKA_BOOTSTRAP_SERVERS );
391+     SecurityProtocol  protocol  = getSecurityProtocol (tableDesc .getProperties ());
392+     return  !StringUtils .isEmpty (kafkaBrokers ) && SecurityProtocol .PLAINTEXT  != protocol ;
318393  }
319394
320395  private  void  collectNeededFileSinkData (BaseWork  work , Set <URI > fileSinkUris , Set <TableDesc > fileSinkTableDescs ) {
@@ -1697,8 +1772,8 @@ public static String getUserSpecifiedDagName(Configuration conf) {
16971772  }
16981773
16991774  @ VisibleForTesting 
1700-   DagUtils (List < DagCredentialSupplier >  suppliers ) {
1701-     this . credentialSuppliers  =  suppliers ; 
1775+   DagUtils () {
1776+     // don't instantiate 
17021777  }
17031778
17041779  /** 
0 commit comments