Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

import org.apache.hudi.exception.HoodieException;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
Expand All @@ -31,27 +29,34 @@
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.springframework.shell.support.logging.HandlerUtils;

import java.util.List;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;

public class SparkTempViewProvider implements TempViewProvider {
private static final Logger LOG = LogManager.getLogger(SparkTempViewProvider.class);
private static final Logger LOG = HandlerUtils.getLogger(SparkTempViewProvider.class);

private JavaSparkContext jsc;
private SQLContext sqlContext;

public SparkTempViewProvider(String appName) {
try {
Handler handler = LOG.getParent().getHandlers()[0];
SparkConf sparkConf = new SparkConf().setAppName(appName)
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").setMaster("local[8]");
jsc = new JavaSparkContext(sparkConf);
jsc.setLogLevel("ERROR");

sqlContext = new SQLContext(jsc);
if (handler != null) {
LOG.getParent().removeHandler(LOG.getParent().getHandlers()[0]);
LOG.getParent().addHandler(handler);
}
} catch (Throwable ex) {
// log full stack trace and rethrow. Without this its difficult to debug failures, if any
LOG.error("unable to initialize spark context ", ex);
LOG.log(Level.WARNING, "unable to initialize spark context ", ex);
throw new HoodieException(ex);
}
}
Expand Down Expand Up @@ -90,7 +95,7 @@ public void createOrReplace(String tableName, List<String> headers, List<List<Co
System.out.println("Wrote table view: " + tableName);
} catch (Throwable ex) {
// log full stack trace and rethrow. Without this its difficult to debug failures, if any
LOG.error("unable to write ", ex);
LOG.log(Level.WARNING, "unable to write ", ex);
throw new HoodieException(ex);
}
}
Expand All @@ -101,7 +106,7 @@ public void runQuery(String sqlText) {
this.sqlContext.sql(sqlText).show(Integer.MAX_VALUE, false);
} catch (Throwable ex) {
// log full stack trace and rethrow. Without this its difficult to debug failures, if any
LOG.error("unable to read ", ex);
LOG.log(Level.WARNING, "unable to read ", ex);
throw new HoodieException(ex);
}
}
Expand All @@ -112,7 +117,7 @@ public void showAllViews() {
sqlContext.sql("SHOW TABLES").show(Integer.MAX_VALUE, false);
} catch (Throwable ex) {
// log full stack trace and rethrow. Without this its difficult to debug failures, if any
LOG.error("unable to get all views ", ex);
LOG.log(Level.WARNING, "unable to get all views ", ex);
throw new HoodieException(ex);
}
}
Expand All @@ -123,7 +128,7 @@ public void deleteTable(String tableName) {
sqlContext.sql("DROP TABLE IF EXISTS " + tableName);
} catch (Throwable ex) {
// log full stack trace and rethrow. Without this its difficult to debug failures, if any
LOG.error("unable to initialize spark context ", ex);
LOG.log(Level.WARNING, "unable to initialize spark context ", ex);
throw new HoodieException(ex);
}
}
Expand Down