Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hadoop: log go side msg to java side #3420

Merged
merged 2 commits into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions sdk/java/libjfs/callback.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* JuiceFS, Copyright 2023 Juicedata, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <stdio.h>

static void (*log_callback)(const char *msg);

typedef void LogCallBack(const char *msg);

void jfs_set_logger(void*p);

void jfs_set_callback(LogCallBack *callback)
{
log_callback = callback;
jfs_set_logger(callback);
}

void jfs_callback(const char *msg)
{
if (log_callback != NULL) {
(*log_callback)(msg);
} else {
fprintf(stderr, "%s", msg);
}
}
40 changes: 40 additions & 0 deletions sdk/java/libjfs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ package main
// #include <sys/stat.h>
// #include <fcntl.h>
// #include <utime.h>
// #include <stdlib.h>
// void jfs_callback(const char *msg);
import "C"
import (
"bytes"
Expand Down Expand Up @@ -143,6 +145,44 @@ type wrapper struct {
supergroup string
}

type logWriter struct {
buf chan string
}

func (w *logWriter) Write(p []byte) (int, error) {
select {
case w.buf <- string(p):
_, _ = os.Stderr.Write(p)
return len(p), nil
default:
return os.Stderr.Write(p)
}
}

func newLogWriter() *logWriter {
w := &logWriter{
buf: make(chan string, 10),
}
go func() {
for l := range w.buf {
cmsg := C.CString(l)
C.jfs_callback(cmsg)
C.free(unsafe.Pointer(cmsg))
}
}()
return w
}

//export jfs_set_logger
func jfs_set_logger(cb unsafe.Pointer) {
utils.DisableLogColor()
if cb != nil {
utils.SetOutput(newLogWriter())
} else {
utils.SetOutput(os.Stderr)
}
}

func (w *wrapper) withPid(pid int) meta.Context {
// mapping Java Thread ID to global one
ctx := meta.NewContext(w.ctx.Pid()*1000+uint32(pid), w.ctx.Uid(), w.ctx.Gids())
Expand Down
59 changes: 59 additions & 0 deletions sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import jnr.ffi.Memory;
import jnr.ffi.Pointer;
import jnr.ffi.Runtime;
import jnr.ffi.annotations.Delegate;
import jnr.ffi.annotations.In;
import jnr.ffi.annotations.Out;
import org.apache.commons.logging.Log;
Expand Down Expand Up @@ -108,6 +109,11 @@ public class JuiceFileSystemImpl extends FileSystem {
private String[] storageIds;
private Random random = new Random();

/*
go call back
*/
private static Libjfs.LogCallBack callBack;

public static interface Libjfs {
long jfs_init(String name, String jsonConf, String user, String group, String superuser, String supergroup);

Expand Down Expand Up @@ -168,6 +174,51 @@ public static interface Libjfs {
int jfs_listXattr(long pid, long h, String path, Pointer buf, int size);

int jfs_removeXattr(long pid, long h, String path, String name);

void jfs_set_callback(LogCallBack callBack);

interface LogCallBack {
@Delegate
void call(String msg);
}
}

static class LogCallBackImpl implements Libjfs.LogCallBack {
Libjfs lib;

public LogCallBackImpl(Libjfs lib) {
this.lib = lib;
}

@Override
public void call(String msg){
try {
// 2022/12/20 14:48:30.808303 juicefs[80976] <ERROR>: error msg [main.go:357]
msg = msg.trim();
String[] items = msg.split("\\s+", 5);
if (items.length > 4) {
switch (items[3]) {
case "<DEBUG>:":
LOG.debug(msg);
break;
case "<INFO>:":
LOG.info(msg);
break;
case "<WARNING>:":
LOG.warn(msg);
break;
case "<ERROR>:":
LOG.error(msg);
break;
}
}
} catch (Throwable ignored){}
}

@Override
protected void finalize() throws Throwable {
lib.jfs_set_callback(null);
}
}

static int EPERM = -0x01;
Expand Down Expand Up @@ -298,6 +349,14 @@ public void initialize(URI uri, Configuration conf) throws IOException {
refreshCache(conf);

lib = loadLibrary();

synchronized (JuiceFileSystemImpl.class) {
if (callBack == null) {
callBack = new LogCallBackImpl(lib);
lib.jfs_set_callback(callBack);
}
}

JSONObject obj = new JSONObject();
String[] keys = new String[]{"meta",};
for (String key : keys) {
Expand Down