Skip to content

Commit

Permalink
agent: add initial debuginfo agent command to dgraph
Browse files Browse the repository at this point in the history
Signed-off-by: fristonio <[email protected]>
  • Loading branch information
fristonio committed Dec 23, 2019
1 parent b36859e commit 6093f3c
Show file tree
Hide file tree
Showing 5 changed files with 510 additions and 1 deletion.
165 changes: 165 additions & 0 deletions dgraph/cmd/agent/debuginfo/archive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Copyright 2019-2020 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package debuginfo

import (
"archive/tar"
"compress/gzip"
"fmt"
"io"
"os"
"path/filepath"
"strings"

"github.com/golang/glog"
)

type tarWriter interface {
io.Writer
WriteHeader(hdr *tar.Header) error
}

type walker struct {
baseDir, debugDir string
output tarWriter
}

func newWalker(baseDir, debugDir string, output tarWriter) *walker {
return &walker{
baseDir: baseDir,
debugDir: debugDir,
output: output,
}
}

func (w *walker) walkPath(path string, info os.FileInfo, err error) error {
if err != nil {
glog.Errorf("Error while walking path %s: %s", path, err)
return nil
}
if info == nil {
glog.Errorf("No file info available")
return nil
}

file, err := os.Open(path)
if err != nil {
glog.Errorf("Failed to open %s: %s", path, err)
return nil
}
defer file.Close()

if info.IsDir() {
if info.Name() == w.baseDir {
return nil
}
glog.Errorf("Skipping directory %s", info.Name())
return nil
}

// Just get the latest fileInfo to make sure that the size is correctly
// when the file is write to tar file
fpInfo, err := file.Stat()
if err != nil {
fpInfo, err = os.Lstat(file.Name())
if err != nil {
glog.Errorf("Failed to retrieve file information: %s", err)
return nil
}
}

header, err := tar.FileInfoHeader(fpInfo, fpInfo.Name())
if err != nil {
glog.Errorf("Failed to prepare file info %s: %s", fpInfo.Name(), err)
return nil
}

if w.baseDir != "" {
header.Name = filepath.Join(w.baseDir, strings.TrimPrefix(path, w.debugDir))
}

if err := w.output.WriteHeader(header); err != nil {
glog.Errorf("Failed to write header: %s", err)
return nil
}

_, err = io.Copy(w.output, file)
return err
}

// createArchive creates a gzipped tar archive for the directory provided
// by recursively traversing in the directory.
// The final tar is placed in the same directory with the name same to the
// archived directory.
func createArchive(debugDir string) (string, error) {
archivePath := fmt.Sprintf("%s.tar", filepath.Base(debugDir))
file, err := os.Create(archivePath)
if err != nil {
return "", err
}
defer file.Close()

writer := tar.NewWriter(file)
defer writer.Close()

var baseDir string
if info, err := os.Stat(debugDir); os.IsNotExist(err) {
return "", err
} else if err == nil && info.IsDir() {
baseDir = filepath.Base(debugDir)
}

walker := newWalker(baseDir, debugDir, writer)
return archivePath, filepath.Walk(debugDir, walker.walkPath)
}

// Creates a Gzipped tar archive of the directory provided as parameter.
func createGzipArchive(debugDir string) (string, error) {
source, err := createArchive(debugDir)
if err != nil {
return "", err
}

reader, err := os.Open(source)
if err != nil {
return "", err
}

filename := filepath.Base(source)
target := fmt.Sprintf("%s.gz", source)
writer, err := os.Create(target)
if err != nil {
return "", err
}
defer writer.Close()

archiver := gzip.NewWriter(writer)
archiver.Name = filename
defer archiver.Close()

_, err = io.Copy(archiver, reader)
if err != nil {
return "", err
}

err = os.Remove(source)
if err != nil {
glog.Warningf("error while removing intermediate tar file: %s", err)
}

return target, nil
}
174 changes: 174 additions & 0 deletions dgraph/cmd/agent/debuginfo/pprof.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Copyright 2019-2020 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package debuginfo

import (
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/golang/glog"
)

type pprofCollector struct {
host string
baseDir string
filePrefix string
duration time.Duration
timeout time.Duration
tr http.RoundTripper
}

var profileTypes = []string{"goroutine", "heap", "threadcreate", "block", "mutex", "profile", "trace"}

func newPprofCollector(host, baseDir, filePrefix string, duration time.Duration) *pprofCollector {
timeout := duration + duration/2

var transport http.RoundTripper = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: timeout,
KeepAlive: timeout,
DualStack: true,
}).DialContext,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}

return &pprofCollector{
host: host,
baseDir: baseDir,
filePrefix: filePrefix,
duration: duration,
timeout: timeout,
tr: transport,
}
}

// Collect all the profiles and save them to the directory specified in baseDir.
func (c *pprofCollector) Collect() {
for _, pType := range profileTypes {
src, err := c.saveProfile(pType)
if err != nil {
glog.Errorf("error while saving pprof profile from %s: %s", src, err)
continue
}

glog.Infof("%s profile saved in %s", pType, src)
}
}

// saveProfile writes the profile specified in the argument fetching it from the host
// provided in the configuration
func (c *pprofCollector) saveProfile(profileType string) (src string, err error) {
var resp io.ReadCloser
source := fmt.Sprintf("%s/debug/pprof/%s", c.host, profileType)

if sourceURL, timeout := adjustURL(source, c.duration, c.timeout); sourceURL != "" {
glog.Info("Fetching profile over HTTP from " + sourceURL)
if c.duration > 0 {
glog.Info(fmt.Sprintf("Please wait... (%v)", c.duration))
}
resp, err = fetchURL(sourceURL, timeout, c.tr)
src = sourceURL
}
if err != nil {
return
}

defer resp.Close()
out, err := os.Create(filepath.Join(c.baseDir, fmt.Sprintf("%s%s.gz", c.filePrefix, profileType)))
if err != nil {
return
}
_, err = io.Copy(out, resp)
return
}

// fetchURL fetches a profile from a URL using HTTP.
func fetchURL(source string, timeout time.Duration, tr http.RoundTripper) (io.ReadCloser, error) {
client := &http.Client{
Transport: tr,
Timeout: timeout + 5*time.Second,
}
resp, err := client.Get(source)
if err != nil {
return nil, fmt.Errorf("http fetch: %v", err)
}
if resp.StatusCode != http.StatusOK {
defer resp.Body.Close()
return nil, statusCodeError(resp)
}

return resp.Body, nil
}

func statusCodeError(resp *http.Response) error {
if resp.Header.Get("X-Go-Pprof") != "" && strings.Contains(resp.Header.Get("Content-Type"), "text/plain") {
// error is from pprof endpoint
if body, err := ioutil.ReadAll(resp.Body); err == nil {
return fmt.Errorf("server response: %s - %s", resp.Status, body)
}
}
return fmt.Errorf("server response: %s", resp.Status)
}

// adjustURL validates if a profile source is a URL and returns an
// cleaned up URL and the timeout to use for retrieval over HTTP.
// If the source cannot be recognized as a URL it returns an empty string.
func adjustURL(source string, duration, timeout time.Duration) (string, time.Duration) {
u, err := url.Parse(source)
if err != nil || (u.Host == "" && u.Scheme != "" && u.Scheme != "file") {
// Try adding http:// to catch sources of the form hostname:port/path.
// url.Parse treats "hostname" as the scheme.
u, err = url.Parse("http://" + source)
}
if err != nil || u.Host == "" {
return "", 0
}

// Apply duration/timeout overrides to URL.
values := u.Query()
if duration > 0 {
values.Set("seconds", fmt.Sprint(int(duration.Seconds())))
} else {
if urlSeconds := values.Get("seconds"); urlSeconds != "" {
if us, err := strconv.ParseInt(urlSeconds, 10, 32); err == nil {
duration = time.Duration(us) * time.Second
}
}
}
if timeout <= 0 {
if duration > 0 {
timeout = duration + duration/2
} else {
timeout = 60 * time.Second
}
}
u.RawQuery = values.Encode()
return u.String(), timeout
}
Loading

0 comments on commit 6093f3c

Please sign in to comment.