1 Star 0 Fork 0

sqos/beats

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
procs.go 9.46 KB
一键复制 编辑 原始数据 按行查看 历史
package procs
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"time"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)
type socketInfo struct {
srcIP, dstIP net.IP
srcPort, dstPort uint16
uid uint32
inode uint64
}
type portProcMapping struct {
port uint16
pid int
proc *process
}
type process struct {
name string
grepper string
pids []int
proc *ProcessesWatcher
refreshPidsTimer <-chan time.Time
}
type ProcessesWatcher struct {
portProcMap map[uint16]portProcMapping
lastMapUpdate time.Time
processes []*process
localAddrs []net.IP
// config
readFromProc bool
maxReadFreq time.Duration
refreshPidsFreq time.Duration
// test helpers
procPrefix string
testSignals *chan bool
}
var ProcWatcher ProcessesWatcher
func (proc *ProcessesWatcher) Init(config ProcsConfig) error {
proc.procPrefix = ""
proc.portProcMap = make(map[uint16]portProcMapping)
proc.lastMapUpdate = time.Now()
proc.readFromProc = config.Enabled
if proc.readFromProc {
if runtime.GOOS != "linux" {
proc.readFromProc = false
logp.Info("Disabled /proc/ reading because not on linux")
} else {
logp.Info("Process matching enabled")
}
} else {
logp.Info("Process matching disabled")
}
if config.MaxProcReadFreq == 0 {
proc.maxReadFreq = 10 * time.Millisecond
} else {
proc.maxReadFreq = config.MaxProcReadFreq
}
if config.RefreshPidsFreq == 0 {
proc.refreshPidsFreq = 1 * time.Second
} else {
proc.refreshPidsFreq = config.RefreshPidsFreq
}
// Read the local IP addresses
var err error
proc.localAddrs, err = common.LocalIPAddrs()
if err != nil {
logp.Err("Error getting local IP addresses: %s", err)
proc.localAddrs = []net.IP{}
}
if proc.readFromProc {
for _, procConfig := range config.Monitored {
grepper := procConfig.CmdlineGrep
if len(grepper) == 0 {
grepper = procConfig.Process
}
p, err := newProcess(proc, procConfig.Process, grepper, time.Tick(proc.refreshPidsFreq))
if err != nil {
logp.Err("NewProcess: %s", err)
} else {
proc.processes = append(proc.processes, p)
}
}
}
return nil
}
func newProcess(proc *ProcessesWatcher, name string, grepper string,
refreshPidsTimer <-chan time.Time) (*process, error) {
p := &process{name: name, proc: proc, grepper: grepper,
refreshPidsTimer: refreshPidsTimer}
// start periodic timer in its own goroutine
go p.refreshPids()
return p, nil
}
func (p *process) refreshPids() {
logp.Debug("procs", "In RefreshPids")
for range p.refreshPidsTimer {
logp.Debug("procs", "In RefreshPids tick")
var err error
p.pids, err = findPidsByCmdlineGrep(p.proc.procPrefix, p.grepper)
if err != nil {
logp.Err("Error finding PID files for %s: %s", p.name, err)
}
logp.Debug("procs", "RefreshPids found pids %s for process %s", p.pids, p.name)
if p.proc.testSignals != nil {
*p.proc.testSignals <- true
}
}
}
func findPidsByCmdlineGrep(prefix string, process string) ([]int, error) {
defer logp.Recover("FindPidsByCmdlineGrep exception")
pids := []int{}
proc, err := os.Open(filepath.Join(prefix, "/proc"))
if err != nil {
return pids, fmt.Errorf("Open /proc: %s", err)
}
defer proc.Close()
names, err := proc.Readdirnames(0)
if err != nil {
return pids, fmt.Errorf("Readdirnames: %s", err)
}
for _, name := range names {
pid, err := strconv.Atoi(name)
if err != nil {
continue
}
cmdline, err := ioutil.ReadFile(filepath.Join(prefix, "/proc/", name, "cmdline"))
if err != nil {
continue
}
if strings.Index(string(cmdline), process) >= 0 {
pids = append(pids, pid)
}
}
return pids, nil
}
func (proc *ProcessesWatcher) FindProcessesTuple(tuple *common.IPPortTuple) (procTuple *common.CmdlineTuple) {
procTuple = &common.CmdlineTuple{}
if !proc.readFromProc {
return
}
if proc.isLocalIP(tuple.SrcIP) {
logp.Debug("procs", "Looking for port %d", tuple.SrcPort)
procTuple.Src = []byte(proc.findProc(tuple.SrcPort))
if len(procTuple.Src) > 0 {
logp.Debug("procs", "Found device %s for port %d", procTuple.Src, tuple.SrcPort)
}
}
if proc.isLocalIP(tuple.DstIP) {
logp.Debug("procs", "Looking for port %d", tuple.DstPort)
procTuple.Dst = []byte(proc.findProc(tuple.DstPort))
if len(procTuple.Dst) > 0 {
logp.Debug("procs", "Found device %s for port %d", procTuple.Dst, tuple.DstPort)
}
}
return
}
func (proc *ProcessesWatcher) findProc(port uint16) (procname string) {
procname = ""
defer logp.Recover("FindProc exception")
p, exists := proc.portProcMap[port]
if exists {
return p.proc.name
}
now := time.Now()
if now.Sub(proc.lastMapUpdate) > proc.maxReadFreq {
proc.lastMapUpdate = now
proc.updateMap()
// try again
p, exists := proc.portProcMap[port]
if exists {
return p.proc.name
}
}
return ""
}
func hexToIpv4(word string) (net.IP, error) {
ip, err := strconv.ParseInt(word, 16, 64)
if err != nil {
return nil, err
}
return net.IPv4(byte(ip), byte(ip>>8), byte(ip>>16), byte(ip>>24)), nil
}
func hexToIpv6(word string) (net.IP, error) {
p := make(net.IP, net.IPv6len)
for i := 0; i < 4; i++ {
part, err := strconv.ParseUint(word[i*8:(i+1)*8], 16, 32)
if err != nil {
return nil, err
}
p[i*4] = byte(part)
p[i*4+1] = byte(part >> 8)
p[i*4+2] = byte(part >> 16)
p[i*4+3] = byte(part >> 24)
}
return p, nil
}
func hexToIP(word string, ipv6 bool) (net.IP, error) {
if ipv6 {
return hexToIpv6(word)
}
return hexToIpv4(word)
}
func hexToIPPort(str []byte, ipv6 bool) (net.IP, uint16, error) {
words := bytes.Split(str, []byte(":"))
if len(words) < 2 {
return nil, 0, errors.New("Didn't find ':' as a separator")
}
ip, err := hexToIP(string(words[0]), ipv6)
if err != nil {
return nil, 0, err
}
port, err := strconv.ParseInt(string(words[1]), 16, 32)
if err != nil {
return nil, 0, err
}
return ip, uint16(port), nil
}
func (proc *ProcessesWatcher) updateMap() {
logp.Debug("procs", "UpdateMap()")
ipv4socks, err := socketsFromProc("/proc/net/tcp", false)
if err != nil {
logp.Err("Parse_Proc_Net_Tcp: %s", err)
return
}
ipv6socks, err := socketsFromProc("/proc/net/tcp6", true)
if err != nil {
logp.Err("Parse_Proc_Net_Tcp ipv6: %s", err)
return
}
socksMap := map[uint64]*socketInfo{}
for _, s := range ipv4socks {
socksMap[s.inode] = s
}
for _, s := range ipv6socks {
socksMap[s.inode] = s
}
for _, p := range proc.processes {
for _, pid := range p.pids {
inodes, err := findSocketsOfPid(proc.procPrefix, pid)
if err != nil {
logp.Err("FindSocketsOfPid: %s", err)
continue
}
for _, inode := range inodes {
sockInfo, exists := socksMap[inode]
if exists {
proc.updateMappingEntry(sockInfo.srcPort, pid, p)
}
}
}
}
}
func socketsFromProc(filename string, ipv6 bool) ([]*socketInfo, error) {
file, err := os.Open(filename)
if err != nil {
return nil, err
}
defer file.Close()
return parseProcNetTCP(file, ipv6)
}
// Parses the /proc/net/tcp file
func parseProcNetTCP(input io.Reader, ipv6 bool) ([]*socketInfo, error) {
buf := bufio.NewReader(input)
sockets := []*socketInfo{}
var err error
var line []byte
for err != io.EOF {
line, err = buf.ReadBytes('\n')
if err != nil && err != io.EOF {
logp.Err("Error reading /proc/net/tcp: %s", err)
return nil, err
}
words := bytes.Fields(line)
if len(words) < 10 || bytes.Equal(words[0], []byte("sl")) {
logp.Debug("procs", "Less then 10 words (%d) or starting with 'sl': %s", len(words), words)
continue
}
var sock socketInfo
var err error
sock.srcIP, sock.srcPort, err = hexToIPPort(words[1], ipv6)
if err != nil {
logp.Debug("procs", "Error parsing IP and port: %s", err)
continue
}
sock.dstIP, sock.dstPort, err = hexToIPPort(words[2], ipv6)
if err != nil {
logp.Debug("procs", "Error parsing IP and port: %s", err)
continue
}
uid, _ := strconv.Atoi(string(words[7]))
sock.uid = uint32(uid)
inode, _ := strconv.Atoi(string(words[9]))
sock.inode = uint64(inode)
sockets = append(sockets, &sock)
}
return sockets, nil
}
func (proc *ProcessesWatcher) updateMappingEntry(port uint16, pid int, p *process) {
entry := portProcMapping{port: port, pid: pid, proc: p}
// Simply overwrite old entries for now.
// We never expire entries from this map. Since there are 65k possible
// ports, the size of the dict can be max 1.5 MB, which we consider
// reasonable.
proc.portProcMap[port] = entry
logp.Debug("procsdetailed", "UpdateMappingEntry(): port=%d pid=%d", port, p.name)
}
func findSocketsOfPid(prefix string, pid int) (inodes []uint64, err error) {
dirname := filepath.Join(prefix, "/proc", strconv.Itoa(pid), "fd")
procfs, err := os.Open(dirname)
if err != nil {
return []uint64{}, fmt.Errorf("Open: %s", err)
}
defer procfs.Close()
names, err := procfs.Readdirnames(0)
if err != nil {
return []uint64{}, fmt.Errorf("Readdirnames: %s", err)
}
for _, name := range names {
link, err := os.Readlink(filepath.Join(dirname, name))
if err != nil {
logp.Debug("procs", "Readlink %s: %s", name, err)
continue
}
if strings.HasPrefix(link, "socket:[") {
inode, err := strconv.ParseInt(link[8:len(link)-1], 10, 64)
if err != nil {
logp.Debug("procs", "ParseInt: %s:", err)
continue
}
inodes = append(inodes, uint64(inode))
}
}
return inodes, nil
}
func (proc *ProcessesWatcher) isLocalIP(ip net.IP) bool {
if ip.IsLoopback() {
return true
}
for _, addr := range proc.localAddrs {
if ip.Equal(addr) {
return true
}
}
return false
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/sqos/beats.git
git@gitee.com:sqos/beats.git
sqos
beats
beats
v5.6.12

搜索帮助