diff --git a/LICENSE b/LICENSE index 95c7a9c7261..76555a026f4 100644 --- a/LICENSE +++ b/LICENSE @@ -213,6 +213,7 @@ Apache Spark ./client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java ./client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java ./common/src/main/java/org/apache/celeborn/common/network/ssl/ReloadingX509TrustManager.java +./common/src/main/java/org/apache/celeborn/common/network/util/NettyLogger.java ./common/src/main/java/org/apache/celeborn/common/unsafe/Platform.java ./common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java ./common/src/main/scala/org/apache/celeborn/common/util/SignalUtils.scala @@ -229,6 +230,7 @@ Apache Spark ./worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/LevelDBIterator.java ./worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/RocksDB.java ./worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/RocksDBProvider.java +./worker/src/main/scala/org/apache/celeborn/service/deploy/worker/profiler/JVMProfiler.scala Apache Kyuubi ./common/src/main/java/org/apache/celeborn/reflect/DynClasses.java diff --git a/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java b/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java index 488e0fd04f4..ec0d1fd8774 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java +++ b/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java @@ -23,6 +23,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.timeout.IdleStateHandler; import org.slf4j.Logger; @@ -36,6 +37,7 @@ import org.apache.celeborn.common.network.protocol.MessageEncoder; import org.apache.celeborn.common.network.server.*; import org.apache.celeborn.common.network.util.FrameDecoder; +import org.apache.celeborn.common.network.util.NettyLogger; import org.apache.celeborn.common.network.util.TransportConf; import org.apache.celeborn.common.network.util.TransportFrameDecoder; @@ -55,6 +57,7 @@ public class TransportContext { private static final Logger logger = LoggerFactory.getLogger(TransportContext.class); + private static final NettyLogger nettyLogger = new NettyLogger(); private final TransportConf conf; private final BaseMessageHandler msgHandler; private final ChannelDuplexHandler channelsLimiter; @@ -147,12 +150,15 @@ public TransportChannelHandler initializePipeline( ChannelInboundHandlerAdapter decoder, BaseMessageHandler resolvedMsgHandler) { try { + ChannelPipeline pipeline = channel.pipeline(); + if (nettyLogger.getLoggingHandler() != null) { + pipeline.addLast("loggingHandler", nettyLogger.getLoggingHandler()); + } if (channelsLimiter != null) { - channel.pipeline().addLast("limiter", channelsLimiter); + pipeline.addLast("limiter", channelsLimiter); } TransportChannelHandler channelHandler = createChannelHandler(channel, resolvedMsgHandler); - channel - .pipeline() + pipeline .addLast("encoder", ENCODER) .addLast(FrameDecoder.HANDLER_NAME, decoder) .addLast( diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/MessageWithHeader.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/MessageWithHeader.java index 21f11d49f85..c2de2612891 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/protocol/MessageWithHeader.java +++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/MessageWithHeader.java @@ -192,4 +192,9 @@ public boolean release(int decrement) { } return super.release(decrement); } + + @Override + public String toString() { + return "MessageWithHeader [headerLength: " + headerLength + ", bodyLength: " + bodyLength + "]"; + } } diff --git a/common/src/main/java/org/apache/celeborn/common/network/util/NettyLogger.java b/common/src/main/java/org/apache/celeborn/common/network/util/NettyLogger.java new file mode 100644 index 00000000000..eb7c5ffbfd0 --- /dev/null +++ b/common/src/main/java/org/apache/celeborn/common/network/util/NettyLogger.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.common.network.util; + +import java.io.IOException; +import java.io.InputStream; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufHolder; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Netty logger that constructs a log handler depending on the log level. + * + *

Note: code copied from Apache Spark. + */ +public class NettyLogger { + private static final Logger logger = LoggerFactory.getLogger(NettyLogger.class); + + /** A Netty LoggingHandler which does not dump the message contents. */ + private static class NoContentLoggingHandler extends LoggingHandler { + + NoContentLoggingHandler(Class clazz, LogLevel level) { + super(clazz, level); + } + + @Override + protected String format(ChannelHandlerContext ctx, String eventName, Object arg) { + if (arg instanceof ByteBuf) { + return format(ctx, eventName) + " " + ((ByteBuf) arg).readableBytes() + "B"; + } else if (arg instanceof ByteBufHolder) { + return format(ctx, eventName) + " " + ((ByteBufHolder) arg).content().readableBytes() + "B"; + } else if (arg instanceof InputStream) { + int available = -1; + try { + available = ((InputStream) arg).available(); + } catch (IOException ex) { + // Swallow, but return -1 to indicate an error happened + } + return format(ctx, eventName, arg) + " " + available + "B"; + } else { + return super.format(ctx, eventName, arg); + } + } + } + + private final LoggingHandler loggingHandler; + + public NettyLogger() { + if (logger.isTraceEnabled()) { + loggingHandler = new LoggingHandler(NettyLogger.class, LogLevel.TRACE); + } else if (logger.isDebugEnabled()) { + loggingHandler = new NoContentLoggingHandler(NettyLogger.class, LogLevel.DEBUG); + } else { + loggingHandler = null; + } + } + + public LoggingHandler getLoggingHandler() { + return loggingHandler; + } +}