Commit dff22c05 authored by Azat Khuziyakhmetov's avatar Azat Khuziyakhmetov
Browse files

forked and migrated to the procstat in telegraf v1.10.3. Fixes #1

parent 6f45c0de
# PFIT-uprocstat Input Plugin
The pfit-uprocstat plugin is a modification of the Telegraf procstat input
The pfit-uprocstat plugin is a modification of the Telegraf procstat input
plugin for efficient monitoring of all user processes and was written by
Azat Khuziyakhmetov
azat.khuziyakhmetov@gwdg.de
Additional modifcations to add job ID's of various batch systems (Torque,
Additional modifcations to add job ID's of various batch systems (Torque,
Moab, LSF, Slurm) as tags from /proc/<pid>/environ were done by
Guido Laubender, laubender@zib.de, +49 30 84185-214
......@@ -24,9 +24,18 @@ Guido Laubender, laubender@zib.de, +49 30 84185-214
# pattern = "nginx"
## user as argument for pgrep (ie, pgrep -u <user>)
# user = "nginx"
## Systemd unit name
# systemd_unit = "nginx.service"
## CGroup name or path
# cgroup = "systemd/system.slice/nginx.service"
## Windows service name
# win_service = ""
## override for process_name
## This is optional; default is sourced from /proc/<pid>/status
# process_name = "bar"
## Field name prefix
# prefix = ""
## comment this out if you want raw cpu_time stats
......@@ -38,16 +47,102 @@ Guido Laubender, laubender@zib.de, +49 30 84185-214
start_from = 1000
## the list of users to be ignored
ignore_users = ["crayadm", "bmon"]
## Method to use when finding process IDs. Can be one of 'pgrep', or
## 'native'. The pgrep finder calls the pgrep executable in the PATH while
## the native finder performs the search directly in a manor dependent on the
## platform. Default is 'pgrep'
# pid_finder = "pgrep"
```
### Tags:
### Metrics:
The job ID's are added as jobid1 and jobid2 tags by this input plugin.
- procstat
- tags:
- pid (when `pid_tag` is true)
- process_name
- pidfile (when defined)
- exe (when defined)
- pattern (when defined)
- user (when selected)
- systemd_unit (when defined)
- cgroup (when defined)
- win_service (when defined)
- fields:
- cpu_time (int)
- cpu_time_guest (float)
- cpu_time_guest_nice (float)
- cpu_time_idle (float)
- cpu_time_iowait (float)
- cpu_time_irq (float)
- cpu_time_nice (float)
- cpu_time_soft_irq (float)
- cpu_time_steal (float)
- cpu_time_stolen (float)
- cpu_time_system (float)
- cpu_time_user (float)
- cpu_usage (float)
- involuntary_context_switches (int)
- memory_data (int)
- memory_locked (int)
- memory_rss (int)
- memory_stack (int)
- memory_swap (int)
- memory_vms (int)
- nice_priority (int)
- num_fds (int, *telegraf* may need to be ran as **root**)
- num_threads (int)
- pid (int)
- read_bytes (int, *telegraf* may need to be ran as **root**)
- read_count (int, *telegraf* may need to be ran as **root**)
- realtime_priority (int)
- rlimit_cpu_time_hard (int)
- rlimit_cpu_time_soft (int)
- rlimit_file_locks_hard (int)
- rlimit_file_locks_soft (int)
- rlimit_memory_data_hard (int)
- rlimit_memory_data_soft (int)
- rlimit_memory_locked_hard (int)
- rlimit_memory_locked_soft (int)
- rlimit_memory_rss_hard (int)
- rlimit_memory_rss_soft (int)
- rlimit_memory_stack_hard (int)
- rlimit_memory_stack_soft (int)
- rlimit_memory_vms_hard (int)
- rlimit_memory_vms_soft (int)
- rlimit_nice_priority_hard (int)
- rlimit_nice_priority_soft (int)
- rlimit_num_fds_hard (int)
- rlimit_num_fds_soft (int)
- rlimit_realtime_priority_hard (int)
- rlimit_realtime_priority_soft (int)
- rlimit_signals_pending_hard (int)
- rlimit_signals_pending_soft (int)
- signals_pending (int)
- voluntary_context_switches (int)
- write_bytes (int, *telegraf* may need to be ran as **root**)
- write_count (int, *telegraf* may need to be ran as **root**)
- procstat_lookup
- tags:
- exe
- pid_finder
- pid_file
- pattern
- prefix
- user
- systemd_unit
- cgroup
- win_service
- result
- fields:
- pid_count (int)
- running (int)
- result_code (int, success = 0, lookup_error = 1)
### Author:
*NOTE: Resource limit > 2147483647 will be reported as 2147483647.*
Guido Laubender, laubender@zib.de, +49 30 84185-214
### Example Output:
### Version:
2018-08-07
```
procstat,pidfile=/var/run/lxc/dnsmasq.pid,process_name=dnsmasq rlimit_file_locks_soft=2147483647i,rlimit_signals_pending_hard=1758i,voluntary_context_switches=478i,read_bytes=307200i,cpu_time_user=0.01,cpu_time_guest=0,memory_swap=0i,memory_locked=0i,rlimit_num_fds_hard=4096i,rlimit_nice_priority_hard=0i,num_fds=11i,involuntary_context_switches=20i,read_count=23i,memory_rss=1388544i,rlimit_memory_rss_soft=2147483647i,rlimit_memory_rss_hard=2147483647i,nice_priority=20i,rlimit_cpu_time_hard=2147483647i,cpu_time=0i,write_bytes=0i,cpu_time_idle=0,cpu_time_nice=0,memory_data=229376i,memory_stack=135168i,rlimit_cpu_time_soft=2147483647i,rlimit_memory_data_hard=2147483647i,rlimit_memory_locked_hard=65536i,rlimit_signals_pending_soft=1758i,write_count=11i,cpu_time_iowait=0,cpu_time_steal=0,cpu_time_stolen=0,rlimit_memory_stack_soft=8388608i,cpu_time_system=0.02,cpu_time_guest_nice=0,rlimit_memory_locked_soft=65536i,rlimit_memory_vms_soft=2147483647i,rlimit_file_locks_hard=2147483647i,rlimit_realtime_priority_hard=0i,pid=828i,num_threads=1i,cpu_time_soft_irq=0,rlimit_memory_vms_hard=2147483647i,rlimit_realtime_priority_soft=0i,memory_vms=15884288i,rlimit_memory_stack_hard=2147483647i,cpu_time_irq=0,rlimit_memory_data_soft=2147483647i,rlimit_num_fds_soft=1024i,signals_pending=0i,rlimit_nice_priority_soft=0i,realtime_priority=0i
procstat,exe=influxd,process_name=influxd rlimit_num_fds_hard=16384i,rlimit_signals_pending_hard=1758i,realtime_priority=0i,rlimit_memory_vms_hard=2147483647i,rlimit_signals_pending_soft=1758i,cpu_time_stolen=0,rlimit_memory_stack_hard=2147483647i,rlimit_realtime_priority_hard=0i,cpu_time=0i,pid=500i,voluntary_context_switches=975i,cpu_time_idle=0,memory_rss=3072000i,memory_locked=0i,rlimit_nice_priority_soft=0i,signals_pending=0i,nice_priority=20i,read_bytes=823296i,cpu_time_soft_irq=0,rlimit_memory_data_hard=2147483647i,rlimit_memory_locked_soft=65536i,write_count=8i,cpu_time_irq=0,memory_vms=33501184i,rlimit_memory_stack_soft=8388608i,cpu_time_iowait=0,rlimit_memory_vms_soft=2147483647i,rlimit_nice_priority_hard=0i,num_fds=29i,memory_data=229376i,rlimit_cpu_time_soft=2147483647i,rlimit_file_locks_soft=2147483647i,num_threads=1i,write_bytes=0i,cpu_time_steal=0,rlimit_memory_rss_hard=2147483647i,cpu_time_guest=0,cpu_time_guest_nice=0,cpu_usage=0,rlimit_memory_locked_hard=65536i,rlimit_file_locks_hard=2147483647i,involuntary_context_switches=38i,read_count=16851i,memory_swap=0i,rlimit_memory_data_soft=2147483647i,cpu_time_user=0.11,rlimit_cpu_time_hard=2147483647i,rlimit_num_fds_soft=16384i,rlimit_realtime_priority_soft=0i,cpu_time_system=0.27,cpu_time_nice=0,memory_stack=135168i,rlimit_memory_rss_soft=2147483647i
```
[agent]
interval="1s"
flush_interval="1s"
[[inputs.procstat]]
exe = "telegraf"
[[outputs.file]]
files = ["stdout"]
package pfituprocstat
import (
"fmt"
"io/ioutil"
"strconv"
"strings"
"github.com/shirou/gopsutil/process"
)
//NativeFinder uses gopsutil to find processes
type NativeFinder struct {
}
//NewNativeFinder ...
func NewNativeFinder() (PIDFinder, error) {
return &NativeFinder{}, nil
}
//Uid will return all pids for the given user
func (pg *NativeFinder) Uid(user string) ([]PID, error) {
var dst []PID
procs, err := process.Processes()
if err != nil {
return dst, err
}
for _, p := range procs {
username, err := p.Username()
if err != nil {
//skip, this can happen if we don't have permissions or
//the pid no longer exists
continue
}
if username == user {
dst = append(dst, PID(p.Pid))
}
}
return dst, nil
}
//PidFile returns the pid from the pid file given.
func (pg *NativeFinder) PidFile(path string) ([]PID, error) {
var pids []PID
pidString, err := ioutil.ReadFile(path)
if err != nil {
return pids, fmt.Errorf("Failed to read pidfile '%s'. Error: '%s'",
path, err)
}
pid, err := strconv.Atoi(strings.TrimSpace(string(pidString)))
if err != nil {
return pids, err
}
pids = append(pids, PID(pid))
return pids, nil
}
func (pg *NativeFinder) FindAll(ignore_users []string, start_from int) ([]PID, error) {
var pids []PID
procs, err := process.Processes()
if err != nil {
return pids, err
}
for _, p := range procs {
uid := -1
uids, err := p.Uids()
if err == nil {
uid = int(uids[0])
}
//skip users with IDs less than intended
if uid < start_from {
continue
}
user, err := p.Username()
if err != nil {
//skip, no permissions or no pid
continue
}
//if a user should be ignored, skip it
if pg.isInArray(user, ignore_users) {
continue
}
//otherwise monitor it
pids = append(pids, PID(p.Pid))
}
return pids, nil
}
func (pg *NativeFinder) isInArray(a string, list []string) bool {
for _, b := range list {
if b == a {
return true
}
}
return false
}
// +build !windows
package pfituprocstat
import (
"regexp"
"github.com/shirou/gopsutil/process"
)
//Pattern matches on the process name
func (pg *NativeFinder) Pattern(pattern string) ([]PID, error) {
var pids []PID
regxPattern, err := regexp.Compile(pattern)
if err != nil {
return pids, err
}
procs, err := process.Processes()
if err != nil {
return pids, err
}
for _, p := range procs {
name, err := p.Exe()
if err != nil {
//skip, this can be caused by the pid no longer existing
//or you having no permissions to access it
continue
}
if regxPattern.MatchString(name) {
pids = append(pids, PID(p.Pid))
}
}
return pids, err
}
//FullPattern matches on the command line when the process was executed
func (pg *NativeFinder) FullPattern(pattern string) ([]PID, error) {
var pids []PID
regxPattern, err := regexp.Compile(pattern)
if err != nil {
return pids, err
}
procs, err := process.Processes()
if err != nil {
return pids, err
}
for _, p := range procs {
cmd, err := p.Cmdline()
if err != nil {
//skip, this can be caused by the pid no longer existing
//or you having no permissions to access it
continue
}
if regxPattern.MatchString(cmd) {
pids = append(pids, PID(p.Pid))
}
}
return pids, err
}
package pfituprocstat
import (
"context"
"fmt"
"regexp"
"time"
"github.com/StackExchange/wmi"
"github.com/shirou/gopsutil/process"
)
//Timeout is the timeout used when making wmi calls
var Timeout = 5 * time.Second
type queryType string
const (
like = queryType("LIKE")
equals = queryType("=")
notEqual = queryType("!=")
)
//Pattern matches on the process name
func (pg *NativeFinder) Pattern(pattern string) ([]PID, error) {
var pids []PID
regxPattern, err := regexp.Compile(pattern)
if err != nil {
return pids, err
}
procs, err := process.Processes()
if err != nil {
return pids, err
}
for _, p := range procs {
name, err := p.Name()
if err != nil {
//skip, this can be caused by the pid no longer existing
//or you having no permissions to access it
continue
}
if regxPattern.MatchString(name) {
pids = append(pids, PID(p.Pid))
}
}
return pids, err
}
//FullPattern matches the cmdLine on windows and will find a pattern using a WMI like query
func (pg *NativeFinder) FullPattern(pattern string) ([]PID, error) {
var pids []PID
procs, err := getWin32ProcsByVariable("CommandLine", like, pattern, Timeout)
if err != nil {
return pids, err
}
for _, p := range procs {
pids = append(pids, PID(p.ProcessID))
}
return pids, nil
}
//GetWin32ProcsByVariable allows you to query any variable with a like query
func getWin32ProcsByVariable(variable string, qType queryType, value string, timeout time.Duration) ([]process.Win32_Process, error) {
var dst []process.Win32_Process
var query string
// should look like "WHERE CommandLine LIKE "procstat"
query = fmt.Sprintf("WHERE %s %s %q", variable, qType, value)
q := wmi.CreateQuery(&dst, query)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
err := WMIQueryWithContext(ctx, q, &dst)
if err != nil {
return []process.Win32_Process{}, fmt.Errorf("could not get win32Proc: %s", err)
}
return dst, nil
}
// WMIQueryWithContext - wraps wmi.Query with a timed-out context to avoid hanging
func WMIQueryWithContext(ctx context.Context, query string, dst interface{}, connectServerArgs ...interface{}) error {
errChan := make(chan error, 1)
go func() {
errChan <- wmi.Query(query, dst, connectServerArgs...)
}()
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errChan:
return err
}
}
package pfituprocstat
import (
"fmt"
"testing"
"os/user"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestGather_RealPattern(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
pg, err := NewNativeFinder()
require.NoError(t, err)
pids, err := pg.Pattern(`procstat`)
require.NoError(t, err)
fmt.Println(pids)
assert.Equal(t, len(pids) > 0, true)
}
func TestGather_RealFullPattern(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
pg, err := NewNativeFinder()
require.NoError(t, err)
pids, err := pg.FullPattern(`%procstat%`)
require.NoError(t, err)
fmt.Println(pids)
assert.Equal(t, len(pids) > 0, true)
}
func TestGather_RealUser(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
user, err := user.Current()
require.NoError(t, err)
pg, err := NewNativeFinder()
require.NoError(t, err)
pids, err := pg.Uid(user.Username)
require.NoError(t, err)
fmt.Println(pids)
assert.Equal(t, len(pids) > 0, true)
}
package pfituprocstat
import (
"bytes"
"fmt"
"io/ioutil"
"os/exec"
"regexp"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/shirou/gopsutil/process"
)
var (
......@@ -20,6 +24,7 @@ var (
type PID int32
type Procstat struct {
PidFinder string `toml:"pid_finder"`
PidFile string `toml:"pid_file"`
Exe string
Pattern string
......@@ -28,16 +33,19 @@ type Procstat struct {
User string
IgnoreUsers []string
StartFrom int
SystemdUnit string
CGroup string `toml:"cgroup"`
PidTag bool
WinService string `toml:"win_service"`
finder PIDFinder
pidFinder PIDFinder
createPIDFinder func() (PIDFinder, error)
procs map[PID]Process
createProcess func(PID) (Process, error)
}
var sampleConfig = `
## Must specify one of: pid_file, exe, or pattern
## PID file to monitor process
# pid_file = "/var/run/nginx.pid"
## executable name (ie, pgrep <exe>)
......@@ -46,20 +54,39 @@ var sampleConfig = `
# pattern = "nginx"
## user as argument for pgrep (ie, pgrep -u <user>)
# user = "nginx"
## Systemd unit name
# systemd_unit = "nginx.service"
## CGroup name or path
# cgroup = "systemd/system.slice/nginx.service"
## Windows service name
# win_service = ""
## override for process_name
## This is optional; default is sourced from /proc/<pid>/status
# process_name = "bar"
## Field name prefix
# prefix = ""
## comment this out if you want raw cpu_time stats
# fielddrop = ["cpu_time_*"]
## This is optional; moves pid into a tag instead of a field
## Add PID as a tag instead of a field; useful to differentiate between
## processes whose tags are otherwise the same. Can create a large number
## of series, use judiciously.
# pid_tag = false
## If none of monitoring options is selected. Then we monitor all processes.
## the lowest process UID to be monitored
start_from = 1000
## the list of users to be ignored
#ignore_users = ["influxdb"]
## Method to use when finding process IDs. Can be one of 'pgrep', or
## 'native'. The pgrep finder calls the pgrep executable in the PATH while
## the native finder performs the search directly in a manor dependent on the
## platform. Default is 'pgrep'
# pid_finder = "pgrep"
`
func (_ *Procstat) SampleConfig() string {
......@@ -78,7 +105,22 @@ func (p *Procstat) Gather(acc telegraf.Accumulator) error {
p.createProcess = defaultProcess
}
procs, err := p.updateProcesses(p.procs)
pids, tags, err := p.findPids(acc)
if err != nil {
fields := map[string]interface{}{
"pid_count": 0,
"running": 0,
"result_code": 1,
}
tags := map[string]string{
"pid_finder": p.PidFinder,
"result": "lookup_error",
}
acc.AddFields("procstat_lookup", fields, tags)
return err
}
procs, err := p.updateProcesses(pids, tags, p.procs)
if err != nil {
acc.AddError(fmt.Errorf("E! Error: procstat getting process, exe: [%s] pidfile: [%s] pattern: [%s] user: [%s] %s",
p.Exe, p.PidFile, p.Pattern, p.User, err.Error()))
......@@ -86,14 +128,23 @@ func (p *Procstat) Gather(acc telegraf.Accumulator) error {
p.procs = procs
for _, proc := range p.procs {
p.addMetrics(proc, acc)
p.addMetric(proc, acc)
}
fields := map[string]interface{}{
"pid_count": len(pids),
"running": len(procs),
"result_code": 0,
}
tags["pid_finder"] = p.PidFinder
tags["result"] = "success"
acc.AddFields("procstat_lookup", fields, tags)
return nil
}
// Add metrics a single Process
func (p *Procstat) addMetrics(proc Process, acc telegraf.Accumulator) {
func (p *Procstat) addMetric(proc Process, acc telegraf.Accumulator) {
var prefix string
if p.Prefix != "" {
prefix = p.Prefix + "_"
......@@ -109,6 +160,14 @@ func (p *Procstat) addMetrics(proc Process, acc telegraf.Accumulator) {
}
}
//If user tag is not already set, set to actual name
if _, ok := proc.Tags()["user"]; !ok {
user, err := proc.Username()
if err == nil {
proc.Tags()["user"] = user
}
}
//If pid is not present as a tag, include it as a field.
if _, pidInTags := proc.Tags()["pid"]; !pidInTags {
fields["pid"] = int32(proc.PID())
......@@ -163,6 +222,9 @@ func (p *Procstat) addMetrics(proc Process, acc telegraf.Accumulator) {
fields[prefix+"memory_rss"] = mem.RSS
fields[prefix+"memory_vms"] = mem.VMS
fields[prefix+"memory_swap"] = mem.Swap
fields[prefix+"memory_data"] = mem.Data
fields[prefix+"memory_stack"] = mem.Stack
fields[prefix+"memory_locked"] = mem.Locked
}
uids, err := proc.Uids()
......@@ -170,6 +232,45 @@ func (p *Procstat) addMetrics(proc Process, acc telegraf.Accumulator) {