Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
import org.apache.hadoop.hbase.util.DynamicClassLoader;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.Methods;
import org.apache.hadoop.hbase.util.ReflectedFunctionCache;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -304,6 +305,24 @@ public static boolean isClassLoaderLoaded() {
return classLoaderLoaded;
}

private static final String PARSE_FROM = "parseFrom";

// We don't bother using the dynamic CLASS_LOADER above, because currently we can't support
// optimizing dynamically loaded classes. We can do it once we build for java9+, see the todo
// in ReflectedFunctionCache
private static final ReflectedFunctionCache<byte[], Filter> FILTERS = ReflectedFunctionCache
.create(ProtobufUtil.class.getClassLoader(), Filter.class, byte[].class, PARSE_FROM);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the content of the directory specified in hbase.dynamic.jars.dir included in the classpath that is under the domain of this classloader? I think that if there are user-provided Filter classes in the path, we should load them. I guess that we cannot assume that they will be in the o.a.h.h.filter package, so we'd have to relax our class selection criteria.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct we'd need to expand our package search, which may lead to increased start times.

I've had this branch hanging around for a long time, I'd like to get this shipped and then we can tackle custom filters in a follow up when I or someone has time

private static final ReflectedFunctionCache<byte[], ByteArrayComparable> COMPARATORS =
ReflectedFunctionCache.create(ProtobufUtil.class.getClassLoader(), ByteArrayComparable.class,
byte[].class, PARSE_FROM);

private static volatile boolean ALLOW_FAST_REFLECTION_FALLTHROUGH = true;

// Visible for tests
public static void setAllowFastReflectionFallthrough(boolean val) {
ALLOW_FAST_REFLECTION_FALLTHROUGH = val;
}

/**
* Prepend the passed bytes with four bytes of magic, {@link ProtobufMagic#PB_MAGIC}, to flag what
* follows as a protobuf in hbase. Prepend these bytes to all content written to znodes, etc.
Expand Down Expand Up @@ -1552,13 +1571,23 @@ public static ComparatorProtos.Comparator toComparator(ByteArrayComparable compa
public static ByteArrayComparable toComparator(ComparatorProtos.Comparator proto)
throws IOException {
String type = proto.getName();
String funcName = "parseFrom";
byte[] value = proto.getSerializedComparator().toByteArray();

try {
ByteArrayComparable result = COMPARATORS.getAndCallByName(type, value);
if (result != null) {
return result;
}

if (!ALLOW_FAST_REFLECTION_FALLTHROUGH) {
throw new IllegalStateException("Failed to deserialize comparator " + type
+ " because fast reflection returned null and fallthrough is disabled");
}

Class<?> c = Class.forName(type, true, ClassLoaderHolder.CLASS_LOADER);
Method parseFrom = c.getMethod(funcName, byte[].class);
Method parseFrom = c.getMethod(PARSE_FROM, byte[].class);
if (parseFrom == null) {
throw new IOException("Unable to locate function: " + funcName + " in type: " + type);
throw new IOException("Unable to locate function: " + PARSE_FROM + " in type: " + type);
}
return (ByteArrayComparable) parseFrom.invoke(null, value);
} catch (Exception e) {
Expand All @@ -1575,12 +1604,22 @@ public static ByteArrayComparable toComparator(ComparatorProtos.Comparator proto
public static Filter toFilter(FilterProtos.Filter proto) throws IOException {
String type = proto.getName();
final byte[] value = proto.getSerializedFilter().toByteArray();
String funcName = "parseFrom";

try {
Filter result = FILTERS.getAndCallByName(type, value);
if (result != null) {
return result;
}

if (!ALLOW_FAST_REFLECTION_FALLTHROUGH) {
throw new IllegalStateException("Failed to deserialize comparator " + type
+ " because fast reflection returned null and fallthrough is disabled");
}

Class<?> c = Class.forName(type, true, ClassLoaderHolder.CLASS_LOADER);
Method parseFrom = c.getMethod(funcName, byte[].class);
Method parseFrom = c.getMethod(PARSE_FROM, byte[].class);
if (parseFrom == null) {
throw new IOException("Unable to locate function: " + funcName + " in type: " + type);
throw new IOException("Unable to locate function: " + PARSE_FROM + " in type: " + type);
}
return (Filter) parseFrom.invoke(c, value);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.client;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
Expand All @@ -25,7 +27,6 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Base64;
Expand All @@ -34,7 +35,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
Expand All @@ -48,6 +48,8 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.hbase.thirdparty.com.google.common.base.Throwables;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;

Expand Down Expand Up @@ -226,9 +228,9 @@ public void testDynamicFilter() throws Exception {
ProtobufUtil.toGet(getProto2);
fail("Should not be able to load the filter class");
} catch (IOException ioe) {
assertTrue(ioe.getCause() instanceof InvocationTargetException);
InvocationTargetException ite = (InvocationTargetException) ioe.getCause();
assertTrue(ite.getTargetException() instanceof DeserializationException);
// This test is deserializing a FilterList, and one of the sub-filters is not found.
// So the actual caused by is buried a few levels deep.
assertThat(Throwables.getRootCause(ioe), instanceOf(ClassNotFoundException.class));
}
FileOutputStream fos = new FileOutputStream(jarFile);
fos.write(Base64.getDecoder().decode(MOCK_FILTER_JAR));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;

import java.io.IOException;
import java.lang.reflect.Modifier;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.reflect.ClassPath;

/**
* Cache to hold resolved Functions generated through reflection. These can be costly to create, but
* then are much faster than typical Method.invoke calls when executing. Upon construction, finds
* all subclasses in the same package of the passed baseClass. For each found class, creates a
* lambda using
* {@link ReflectionUtils#getOneArgStaticMethodAsFunction(Class, String, Class, Class)}. These are
* added to a hashmap for fast lookup by name later.
* @param <I> the input argument type for the resolved functions
* @param <R> the return type for the resolved functions
*/
@InterfaceAudience.Private
final public class ReflectedFunctionCache<I, R> {

private static final Logger LOG = LoggerFactory.getLogger(ReflectedFunctionCache.class);

private final Map<String, Function<I, ? extends R>> lambdasByClass;

private ReflectedFunctionCache(Map<String, Function<I, ? extends R>> lambdasByClass) {
this.lambdasByClass = lambdasByClass;
}

/**
* Create a cache of reflected functions using the provided classloader and baseClass. Will find
* all subclasses of the provided baseClass (in the same package), and then foreach look for a
* static one-arg method with the methodName and argClass. The expectation is that the method
* returns a value whose class extends the baseClass. This was primarily designed for use by our
* Filter and Comparator parseFrom methods.
*/
public static <I, R> ReflectedFunctionCache<I, R> create(ClassLoader classLoader,
Class<R> baseClass, Class<I> argClass, String methodName) {
Map<String, Function<I, ? extends R>> lambdasByClass = new HashMap<>();
Set<? extends Class<? extends R>> classes = getSubclassesInPackage(classLoader, baseClass);
for (Class<? extends R> clazz : classes) {
Function<I, ? extends R> func = createFunction(clazz, methodName, argClass, clazz);
if (func != null) {
lambdasByClass.put(clazz.getName(), func);
}
}
return new ReflectedFunctionCache<>(lambdasByClass);
}

/**
* Get and execute the Function for the given className, passing the argument to the function and
* returning the result.
* @param className the full name of the class to lookup
* @param argument the argument to pass to the function, if found.
* @return null if a function is not found for classname, otherwise the result of the function.
*/
public R getAndCallByName(String className, I argument) {
Function<I, ? extends R> lambda = lambdasByClass.get(className);

// todo: if we ever make java9+ our lowest supported jdk version, we can
// handle generating these for newly loaded classes from our DynamicClassLoader using
// MethodHandles.privateLookupIn(). For now this is not possible, because we can't easily
// create a privileged lookup in a non-default ClassLoader.
if (lambda == null) {
return null;
}

return lambda.apply(argument);
}

private static <R> Set<Class<? extends R>> getSubclassesInPackage(ClassLoader classLoader,
Class<R> baseClass) {
try {
return ClassPath.from(classLoader).getAllClasses().stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this cause we load all classes even if it is not used by now? I used to use guava's ClassPath in a project but it performed differently when executing in IDE and in command line, finally I chose to use ClassPathScanningCandidateComponentProvider in spring for scanning classes...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will cause us to load all of the matching ones, i.e. the ones i call load on below after filtering to the correct package. So it will load all of the filters in org.apache.hadoop.hbase.filter on startup.

I thought this was preferable because the number of classes is not large. Since I do it on startup, I don't need to worry about synchronization. I could only populate the cache as Filters are accessed, but then I need to handle synchronization. Would you prefer that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading the code for ClassPath#getAllClasses, it seems that it will build an exhaustive set of all classes on the classpath that are loadable. It doesn't actually load the classes. My understanding is that Stream operations are lazy, so the only classes loaded should be those that materialize in the final collect.

.filter(clazz -> clazz.getPackageName().equalsIgnoreCase(baseClass.getPackage().getName()))
.map(ClassPath.ClassInfo::load).filter(clazz -> !Modifier.isAbstract(clazz.getModifiers()))
.filter(baseClass::isAssignableFrom).map(clazz -> (Class<? extends R>) clazz)
.collect(Collectors.toSet());
} catch (IOException e) {
LOG.debug("Failed to resolve subclasses of {}", baseClass, e);
return Collections.emptySet();
}
}

private static <I, O> Function<I, O> createFunction(Class<?> clazz, String methodName,
Class<I> argumentClazz, Class<O> returnValueClass) {
try {
return ReflectionUtils.getOneArgStaticMethodAsFunction(clazz, methodName, argumentClazz,
returnValueClass);
} catch (Throwable e) {
LOG.debug("Failed to create function for class={}", clazz, e);
return null;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.lang.invoke.CallSite;
import java.lang.invoke.LambdaMetafactory;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
Expand All @@ -29,6 +34,7 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.charset.Charset;
import java.util.function.Function;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;

Expand Down Expand Up @@ -208,6 +214,30 @@ private static String getTaskName(long id, String name) {
return id + " (" + name + ")";
}

/**
* Creates a Function which can be called to performantly execute a reflected static method. The
* creation of the Function itself may not be fast, but executing that method thereafter should be
* much faster than {@link #invokeMethod(Object, String, Object...)}.
* @param lookupClazz the class to find the static method in
* @param methodName the method name
* @param argumentClazz the type of the argument
* @param returnValueClass the type of the return value
* @return a function which when called executes the requested static method.
* @throws Throwable exception types from the underlying reflection
*/
public static <I, R> Function<I, R> getOneArgStaticMethodAsFunction(Class<?> lookupClazz,
String methodName, Class<I> argumentClazz, Class<R> returnValueClass) throws Throwable {
MethodHandles.Lookup lookup = MethodHandles.lookup();
MethodHandle methodHandle = lookup.findStatic(lookupClazz, methodName,
MethodType.methodType(returnValueClass, argumentClazz));
CallSite site =
LambdaMetafactory.metafactory(lookup, "apply", MethodType.methodType(Function.class),
methodHandle.type().generic(), methodHandle, methodHandle.type());

return (Function<I, R>) site.getTarget().invokeExact();

}

/**
* Get and invoke the target method from the given object with given parameters
* @param obj the object to get and invoke method from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.util;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -202,4 +203,20 @@ public static void addJarFilesToJar(File targetJar, String libPrefix, File... sr
public static String localDirPath(Configuration conf) {
return conf.get(ClassLoaderBase.LOCAL_DIR_KEY) + File.separator + "jars" + File.separator;
}

public static void deleteClass(String className, String testDir, Configuration conf)
throws Exception {
String jarFileName = className + ".jar";
File file = new File(testDir, jarFileName);
file.delete();
assertFalse("Should be deleted: " + file.getPath(), file.exists());

file = new File(conf.get("hbase.dynamic.jars.dir"), jarFileName);
file.delete();
assertFalse("Should be deleted: " + file.getPath(), file.exists());

file = new File(ClassLoaderTestHelper.localDirPath(conf), jarFileName);
file.delete();
assertFalse("Should be deleted: " + file.getPath(), file.exists());
}
}
Loading