From 64a21365012e9085fee4103df2ae1688f7f909f2 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Mon, 9 Dec 2024 16:21:41 +0100 Subject: [PATCH] Fix nvidia-smi parsing --- .../fixtures/{process.txt => process_1.txt} | 0 .../psutil/gpu/nvidia/fixtures/process_2.txt | 22 ++++ resources/psutil/gpu/nvidia/nvidia.go | 101 +++++++++++------ resources/psutil/gpu/nvidia/nvidia_test.go | 106 +++++++++++++++++- 4 files changed, 192 insertions(+), 37 deletions(-) rename resources/psutil/gpu/nvidia/fixtures/{process.txt => process_1.txt} (100%) create mode 100644 resources/psutil/gpu/nvidia/fixtures/process_2.txt diff --git a/resources/psutil/gpu/nvidia/fixtures/process.txt b/resources/psutil/gpu/nvidia/fixtures/process_1.txt similarity index 100% rename from resources/psutil/gpu/nvidia/fixtures/process.txt rename to resources/psutil/gpu/nvidia/fixtures/process_1.txt diff --git a/resources/psutil/gpu/nvidia/fixtures/process_2.txt b/resources/psutil/gpu/nvidia/fixtures/process_2.txt new file mode 100644 index 00000000..c29844d5 --- /dev/null +++ b/resources/psutil/gpu/nvidia/fixtures/process_2.txt @@ -0,0 +1,22 @@ +# gpu pid type sm mem enc dec jpg ofa fb ccpm command +# Idx # C/G % % % % % % MB MB name + 0 3908637 C 5 0 3 0 - - 1209 0 ffmpeg + 1 - - - - - - - - - - - + 0 3908637 C 4 0 3 0 - - 1209 0 ffmpeg + 1 - - - - - - - - - - - + 0 3908637 C 4 0 3 0 - - 1209 0 ffmpeg + 1 - - - - - - - - - - - + 0 3908637 C 5 0 3 0 - - 1209 0 ffmpeg + 1 - - - - - - - - - - - + 0 3908637 C 5 0 3 0 - - 1209 0 ffmpeg + 1 - - - - - - - - - - - + 0 3908637 C 5 0 3 0 - - 1209 0 ffmpeg + 1 - - - - - - - - - - - + 0 3908637 C 5 0 3 0 - - 1209 0 ffmpeg + 1 - - - - - - - - - - - + 0 3908637 C 5 0 3 0 - - 1209 0 ffmpeg + 1 - - - - - - - - - - - + 0 3908637 C 5 0 3 0 - - 1209 0 ffmpeg + 1 - - - - - - - - - - - + 0 3908637 C 5 0 3 0 - - 1209 0 ffmpeg + 1 - - - - - - - - - - - \ No newline at end of file diff --git a/resources/psutil/gpu/nvidia/nvidia.go b/resources/psutil/gpu/nvidia/nvidia.go index c46bbce1..709da1ab 100644 --- a/resources/psutil/gpu/nvidia/nvidia.go +++ b/resources/psutil/gpu/nvidia/nvidia.go @@ -9,6 +9,7 @@ import ( "regexp" "slices" "strconv" + "strings" "sync" "time" @@ -140,6 +141,7 @@ type writerProcess struct { buf bytes.Buffer ch chan Process terminator []byte + matcher *processMatcher } func (w *writerProcess) Write(data []byte) (int, error) { @@ -160,7 +162,7 @@ func (w *writerProcess) Write(data []byte) (int, error) { break } - s, err := parseProcess(content) + s, err := w.matcher.Parse(content) if err != nil { continue } @@ -171,66 +173,96 @@ func (w *writerProcess) Write(data []byte) (int, error) { return n, nil } -const processMatcher = `^\s*([0-9]+)\s+([0-9]+)\s+[A-Z]\s+([0-9-]+)\s+[0-9-]+\s+([0-9-]+)\s+([0-9-]+)\s+([0-9]+).*` +type processMatcher struct { + re *regexp.Regexp + mapping map[string]int +} -// # gpu pid type sm mem enc dec fb command -// # Idx # C/G % % % % MB name -// -// 0 7372 C 2 0 2 - 136 ffmpeg -// 0 12176 C 5 2 3 7 782 ffmpeg -// 0 20035 C 8 2 4 1 1145 ffmpeg -// 0 20141 C 2 1 1 3 429 ffmpeg -// 0 29591 C 2 1 - 2 435 ffmpeg -var reProcessMatcher = regexp.MustCompile(processMatcher) +func newProcessMatcher() *processMatcher { + m := &processMatcher{ + re: regexp.MustCompile(`\s+`), + mapping: map[string]int{}, + } -func parseProcess(data []byte) (Process, error) { + return m +} + +func (m *processMatcher) Parse(data []byte) (Process, error) { p := Process{} if len(data) == 0 { return p, fmt.Errorf("empty line") } - if data[0] == '#' { + line := string(data) + + if strings.HasPrefix(line, "# gpu") { + m.mapping = map[string]int{} + columns := m.re.Split(strings.TrimPrefix(line, "# "), -1) + for i, column := range columns { + m.mapping[column] = i + } + } + + if line[0] == '#' { return p, fmt.Errorf("comment") } - matches := reProcessMatcher.FindStringSubmatch(string(data)) - if matches == nil { + columns := m.re.Split(strings.TrimSpace(line), -1) + if len(columns) == 0 { return p, fmt.Errorf("no matches found") } - if len(matches) != 7 { - return p, fmt.Errorf("not the expected number of matches found") + if columns[0] == line { + return p, fmt.Errorf("no matches found") } - if d, err := strconv.ParseInt(matches[1], 10, 0); err == nil { - p.Index = int(d) + if index, ok := m.mapping["gpu"]; ok { + if d, err := strconv.ParseInt(columns[index], 10, 0); err == nil { + p.Index = int(d) + } } - if d, err := strconv.ParseInt(matches[2], 10, 32); err == nil { - p.PID = int32(d) + if index, ok := m.mapping["pid"]; ok { + if d, err := strconv.ParseInt(columns[index], 10, 32); err == nil { + p.PID = int32(d) + } } - if matches[3][0] != '-' { - if d, err := strconv.ParseFloat(matches[3], 64); err == nil { - p.Usage = d + if index, ok := m.mapping["sm"]; ok { + if columns[index] != "-" { + if d, err := strconv.ParseFloat(columns[index], 64); err == nil { + p.Usage = d + } } } - if matches[4][0] != '-' { - if d, err := strconv.ParseFloat(matches[4], 64); err == nil { - p.Encoder = d + if index, ok := m.mapping["enc"]; ok { + if columns[index] != "-" { + if d, err := strconv.ParseFloat(columns[index], 64); err == nil { + p.Encoder = d + } } } - if matches[5][0] != '-' { - if d, err := strconv.ParseFloat(matches[5], 64); err == nil { - p.Decoder = d + if index, ok := m.mapping["dec"]; ok { + if columns[index] != "-" { + if d, err := strconv.ParseFloat(columns[index], 64); err == nil { + p.Decoder = d + } } } - if d, err := strconv.ParseUint(matches[6], 10, 64); err == nil { - p.Memory = d * 1024 * 1024 + if index, ok := m.mapping["fb"]; ok { + if columns[index] != "-" { + if d, err := strconv.ParseUint(columns[index], 10, 64); err == nil { + p.Memory = d * 1024 * 1024 + } + } + } + + if p.PID == 0 { + return p, fmt.Errorf("no process found") } return p, nil @@ -271,6 +303,7 @@ func New(path string) gpu.GPU { n.wrProcess = &writerProcess{ ch: make(chan Process, 32), terminator: []byte("\n"), + matcher: newProcessMatcher(), } ctx, cancel := context.WithCancel(context.Background()) @@ -379,12 +412,14 @@ func (n *nvidia) runProcessOnce(path string) (map[int32]Process, error) { return nil, err } + matcher := newProcessMatcher() + lines := bytes.Split(data.Bytes(), []byte{'\n'}) process := map[int32]Process{} for _, line := range lines { - p, err := parseProcess(line) + p, err := matcher.Parse(line) if err != nil { continue } diff --git a/resources/psutil/gpu/nvidia/nvidia_test.go b/resources/psutil/gpu/nvidia/nvidia_test.go index 918d235e..aa871b7b 100644 --- a/resources/psutil/gpu/nvidia/nvidia_test.go +++ b/resources/psutil/gpu/nvidia/nvidia_test.go @@ -85,15 +85,47 @@ func TestParseQuery(t *testing.T) { }, nv) } +func TestProcessMatcher(t *testing.T) { + matcher := newProcessMatcher() + + _, err := matcher.Parse([]byte("# gpu pid type sm mem enc dec fb command")) + require.Error(t, err) + + require.Equal(t, map[string]int{ + "gpu": 0, + "pid": 1, + "type": 2, + "sm": 3, + "mem": 4, + "enc": 5, + "dec": 6, + "fb": 7, + "command": 8, + }, matcher.mapping) + + p, err := matcher.Parse([]byte(" 0 7372 C 42 1 12 34 136 ffmpeg ")) + require.NoError(t, err) + + require.Equal(t, Process{ + Index: 0, + PID: 7372, + Memory: 136 * 1024 * 1024, + Usage: 42, + Encoder: 12, + Decoder: 34, + }, p) +} + func TestParseProcess(t *testing.T) { - data, err := os.ReadFile("./fixtures/process.txt") + data, err := os.ReadFile("./fixtures/process_1.txt") require.NoError(t, err) + matcher := newProcessMatcher() lines := bytes.Split(data, []byte("\n")) process := map[int32]Process{} for _, line := range lines { - p, err := parseProcess(line) + p, err := matcher.Parse(line) if err != nil { continue } @@ -143,17 +175,44 @@ func TestParseProcess(t *testing.T) { Decoder: 1, }, }, process) + + data, err = os.ReadFile("./fixtures/process_2.txt") + require.NoError(t, err) + + lines = bytes.Split(data, []byte("\n")) + process = map[int32]Process{} + + for _, line := range lines { + p, err := matcher.Parse(line) + if err != nil { + continue + } + + process[p.PID] = p + } + + require.Equal(t, map[int32]Process{ + 3908637: { + Index: 0, + PID: 3908637, + Memory: 1209 * 1024 * 1024, + Usage: 5, + Encoder: 3, + Decoder: 0, + }, + }, process) } func TestParseProcessNoProcesses(t *testing.T) { data, err := os.ReadFile("./fixtures/process_noprocesses.txt") require.NoError(t, err) + matcher := newProcessMatcher() lines := bytes.Split(data, []byte("\n")) process := map[int32]Process{} for _, line := range lines { - p, err := parseProcess(line) + p, err := matcher.Parse(line) if err != nil { continue } @@ -219,12 +278,13 @@ func TestWriterQuery(t *testing.T) { } func TestWriterProcess(t *testing.T) { - data, err := os.ReadFile("./fixtures/process.txt") + data, err := os.ReadFile("./fixtures/process_1.txt") require.NoError(t, err) wr := &writerProcess{ ch: make(chan Process, 32), terminator: []byte("\n"), + matcher: newProcessMatcher(), } process := map[int32]Process{} @@ -287,6 +347,44 @@ func TestWriterProcess(t *testing.T) { Decoder: 1, }, }, process) + + data, err = os.ReadFile("./fixtures/process_2.txt") + require.NoError(t, err) + + wr = &writerProcess{ + ch: make(chan Process, 32), + terminator: []byte("\n"), + matcher: newProcessMatcher(), + } + + process = map[int32]Process{} + wg = sync.WaitGroup{} + wg.Add(1) + + go func() { + defer wg.Done() + for p := range wr.ch { + process[p.PID] = p + } + }() + + _, err = wr.Write(data) + require.NoError(t, err) + + close(wr.ch) + + wg.Wait() + + require.Equal(t, map[int32]Process{ + 3908637: { + Index: 0, + PID: 3908637, + Memory: 1209 * 1024 * 1024, + Usage: 5, + Encoder: 3, + Decoder: 0, + }, + }, process) } func TestNvidiaGPUCount(t *testing.T) {