Skip to content

Commit

Permalink
Fix nvidia-smi parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
ioppermann committed Dec 9, 2024
1 parent cfc5b7d commit 64a2136
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 37 deletions.
22 changes: 22 additions & 0 deletions resources/psutil/gpu/nvidia/fixtures/process_2.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# gpu pid type sm mem enc dec jpg ofa fb ccpm command
# Idx # C/G % % % % % % MB MB name
0 3908637 C 5 0 3 0 - - 1209 0 ffmpeg
1 - - - - - - - - - - -
0 3908637 C 4 0 3 0 - - 1209 0 ffmpeg
1 - - - - - - - - - - -
0 3908637 C 4 0 3 0 - - 1209 0 ffmpeg
1 - - - - - - - - - - -
0 3908637 C 5 0 3 0 - - 1209 0 ffmpeg
1 - - - - - - - - - - -
0 3908637 C 5 0 3 0 - - 1209 0 ffmpeg
1 - - - - - - - - - - -
0 3908637 C 5 0 3 0 - - 1209 0 ffmpeg
1 - - - - - - - - - - -
0 3908637 C 5 0 3 0 - - 1209 0 ffmpeg
1 - - - - - - - - - - -
0 3908637 C 5 0 3 0 - - 1209 0 ffmpeg
1 - - - - - - - - - - -
0 3908637 C 5 0 3 0 - - 1209 0 ffmpeg
1 - - - - - - - - - - -
0 3908637 C 5 0 3 0 - - 1209 0 ffmpeg
1 - - - - - - - - - - -
101 changes: 68 additions & 33 deletions resources/psutil/gpu/nvidia/nvidia.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"regexp"
"slices"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -140,6 +141,7 @@ type writerProcess struct {
buf bytes.Buffer
ch chan Process
terminator []byte
matcher *processMatcher
}

func (w *writerProcess) Write(data []byte) (int, error) {
Expand All @@ -160,7 +162,7 @@ func (w *writerProcess) Write(data []byte) (int, error) {
break
}

s, err := parseProcess(content)
s, err := w.matcher.Parse(content)
if err != nil {
continue
}
Expand All @@ -171,66 +173,96 @@ func (w *writerProcess) Write(data []byte) (int, error) {
return n, nil
}

const processMatcher = `^\s*([0-9]+)\s+([0-9]+)\s+[A-Z]\s+([0-9-]+)\s+[0-9-]+\s+([0-9-]+)\s+([0-9-]+)\s+([0-9]+).*`
type processMatcher struct {
re *regexp.Regexp
mapping map[string]int
}

// # gpu pid type sm mem enc dec fb command
// # Idx # C/G % % % % MB name
//
// 0 7372 C 2 0 2 - 136 ffmpeg
// 0 12176 C 5 2 3 7 782 ffmpeg
// 0 20035 C 8 2 4 1 1145 ffmpeg
// 0 20141 C 2 1 1 3 429 ffmpeg
// 0 29591 C 2 1 - 2 435 ffmpeg
var reProcessMatcher = regexp.MustCompile(processMatcher)
func newProcessMatcher() *processMatcher {
m := &processMatcher{
re: regexp.MustCompile(`\s+`),
mapping: map[string]int{},
}

func parseProcess(data []byte) (Process, error) {
return m
}

func (m *processMatcher) Parse(data []byte) (Process, error) {
p := Process{}

if len(data) == 0 {
return p, fmt.Errorf("empty line")
}

if data[0] == '#' {
line := string(data)

if strings.HasPrefix(line, "# gpu") {
m.mapping = map[string]int{}
columns := m.re.Split(strings.TrimPrefix(line, "# "), -1)
for i, column := range columns {
m.mapping[column] = i
}
}

if line[0] == '#' {
return p, fmt.Errorf("comment")
}

matches := reProcessMatcher.FindStringSubmatch(string(data))
if matches == nil {
columns := m.re.Split(strings.TrimSpace(line), -1)
if len(columns) == 0 {
return p, fmt.Errorf("no matches found")
}

if len(matches) != 7 {
return p, fmt.Errorf("not the expected number of matches found")
if columns[0] == line {
return p, fmt.Errorf("no matches found")
}

if d, err := strconv.ParseInt(matches[1], 10, 0); err == nil {
p.Index = int(d)
if index, ok := m.mapping["gpu"]; ok {
if d, err := strconv.ParseInt(columns[index], 10, 0); err == nil {
p.Index = int(d)
}
}

if d, err := strconv.ParseInt(matches[2], 10, 32); err == nil {
p.PID = int32(d)
if index, ok := m.mapping["pid"]; ok {
if d, err := strconv.ParseInt(columns[index], 10, 32); err == nil {
p.PID = int32(d)
}
}

if matches[3][0] != '-' {
if d, err := strconv.ParseFloat(matches[3], 64); err == nil {
p.Usage = d
if index, ok := m.mapping["sm"]; ok {
if columns[index] != "-" {
if d, err := strconv.ParseFloat(columns[index], 64); err == nil {
p.Usage = d
}
}
}

if matches[4][0] != '-' {
if d, err := strconv.ParseFloat(matches[4], 64); err == nil {
p.Encoder = d
if index, ok := m.mapping["enc"]; ok {
if columns[index] != "-" {
if d, err := strconv.ParseFloat(columns[index], 64); err == nil {
p.Encoder = d
}
}
}

if matches[5][0] != '-' {
if d, err := strconv.ParseFloat(matches[5], 64); err == nil {
p.Decoder = d
if index, ok := m.mapping["dec"]; ok {
if columns[index] != "-" {
if d, err := strconv.ParseFloat(columns[index], 64); err == nil {
p.Decoder = d
}
}
}

if d, err := strconv.ParseUint(matches[6], 10, 64); err == nil {
p.Memory = d * 1024 * 1024
if index, ok := m.mapping["fb"]; ok {
if columns[index] != "-" {
if d, err := strconv.ParseUint(columns[index], 10, 64); err == nil {
p.Memory = d * 1024 * 1024
}
}
}

if p.PID == 0 {
return p, fmt.Errorf("no process found")
}

return p, nil
Expand Down Expand Up @@ -271,6 +303,7 @@ func New(path string) gpu.GPU {
n.wrProcess = &writerProcess{
ch: make(chan Process, 32),
terminator: []byte("\n"),
matcher: newProcessMatcher(),
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -379,12 +412,14 @@ func (n *nvidia) runProcessOnce(path string) (map[int32]Process, error) {
return nil, err
}

matcher := newProcessMatcher()

lines := bytes.Split(data.Bytes(), []byte{'\n'})

process := map[int32]Process{}

for _, line := range lines {
p, err := parseProcess(line)
p, err := matcher.Parse(line)
if err != nil {
continue
}
Expand Down
106 changes: 102 additions & 4 deletions resources/psutil/gpu/nvidia/nvidia_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,47 @@ func TestParseQuery(t *testing.T) {
}, nv)
}

func TestProcessMatcher(t *testing.T) {
matcher := newProcessMatcher()

_, err := matcher.Parse([]byte("# gpu pid type sm mem enc dec fb command"))
require.Error(t, err)

require.Equal(t, map[string]int{
"gpu": 0,
"pid": 1,
"type": 2,
"sm": 3,
"mem": 4,
"enc": 5,
"dec": 6,
"fb": 7,
"command": 8,
}, matcher.mapping)

p, err := matcher.Parse([]byte(" 0 7372 C 42 1 12 34 136 ffmpeg "))
require.NoError(t, err)

require.Equal(t, Process{
Index: 0,
PID: 7372,
Memory: 136 * 1024 * 1024,
Usage: 42,
Encoder: 12,
Decoder: 34,
}, p)
}

func TestParseProcess(t *testing.T) {
data, err := os.ReadFile("./fixtures/process.txt")
data, err := os.ReadFile("./fixtures/process_1.txt")
require.NoError(t, err)

matcher := newProcessMatcher()
lines := bytes.Split(data, []byte("\n"))
process := map[int32]Process{}

for _, line := range lines {
p, err := parseProcess(line)
p, err := matcher.Parse(line)
if err != nil {
continue
}
Expand Down Expand Up @@ -143,17 +175,44 @@ func TestParseProcess(t *testing.T) {
Decoder: 1,
},
}, process)

data, err = os.ReadFile("./fixtures/process_2.txt")
require.NoError(t, err)

lines = bytes.Split(data, []byte("\n"))
process = map[int32]Process{}

for _, line := range lines {
p, err := matcher.Parse(line)
if err != nil {
continue
}

process[p.PID] = p
}

require.Equal(t, map[int32]Process{
3908637: {
Index: 0,
PID: 3908637,
Memory: 1209 * 1024 * 1024,
Usage: 5,
Encoder: 3,
Decoder: 0,
},
}, process)
}

func TestParseProcessNoProcesses(t *testing.T) {
data, err := os.ReadFile("./fixtures/process_noprocesses.txt")
require.NoError(t, err)

matcher := newProcessMatcher()
lines := bytes.Split(data, []byte("\n"))
process := map[int32]Process{}

for _, line := range lines {
p, err := parseProcess(line)
p, err := matcher.Parse(line)
if err != nil {
continue
}
Expand Down Expand Up @@ -219,12 +278,13 @@ func TestWriterQuery(t *testing.T) {
}

func TestWriterProcess(t *testing.T) {
data, err := os.ReadFile("./fixtures/process.txt")
data, err := os.ReadFile("./fixtures/process_1.txt")
require.NoError(t, err)

wr := &writerProcess{
ch: make(chan Process, 32),
terminator: []byte("\n"),
matcher: newProcessMatcher(),
}

process := map[int32]Process{}
Expand Down Expand Up @@ -287,6 +347,44 @@ func TestWriterProcess(t *testing.T) {
Decoder: 1,
},
}, process)

data, err = os.ReadFile("./fixtures/process_2.txt")
require.NoError(t, err)

wr = &writerProcess{
ch: make(chan Process, 32),
terminator: []byte("\n"),
matcher: newProcessMatcher(),
}

process = map[int32]Process{}
wg = sync.WaitGroup{}
wg.Add(1)

go func() {
defer wg.Done()
for p := range wr.ch {
process[p.PID] = p
}
}()

_, err = wr.Write(data)
require.NoError(t, err)

close(wr.ch)

wg.Wait()

require.Equal(t, map[int32]Process{
3908637: {
Index: 0,
PID: 3908637,
Memory: 1209 * 1024 * 1024,
Usage: 5,
Encoder: 3,
Decoder: 0,
},
}, process)
}

func TestNvidiaGPUCount(t *testing.T) {
Expand Down

0 comments on commit 64a2136

Please sign in to comment.