1 Star 0 Fork 0

zhangjungang/beats

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
sniffer.go 9.31 KB
一键复制 编辑 原始数据 按行查看 历史
package sniffer
import (
"fmt"
"io"
"os"
"strconv"
"syscall"
"time"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/packetbeat/config"
"github.com/elastic/beats/packetbeat/decoder"
"github.com/elastic/beats/packetbeat/protos"
"github.com/elastic/beats/packetbeat/protos/icmp"
"github.com/elastic/beats/packetbeat/protos/tcp"
"github.com/elastic/beats/packetbeat/protos/udp"
"github.com/tsg/gopacket"
"github.com/tsg/gopacket/layers"
"github.com/tsg/gopacket/pcap"
)
type SnifferSetup struct {
pcapHandle *pcap.Handle
afpacketHandle *AfpacketHandle
pfringHandle *PfringHandle
config *config.InterfacesConfig
isAlive bool
dumper *pcap.Dumper
Decoder *decoder.DecoderStruct
DataSource gopacket.PacketDataSource
}
// Computes the block_size and the num_blocks in such a way that the
// allocated mmap buffer is close to but smaller than target_size_mb.
// The restriction is that the block_size must be divisible by both the
// frame size and page size.
func afpacketComputeSize(target_size_mb int, snaplen int, page_size int) (
frame_size int, block_size int, num_blocks int, err error) {
if snaplen < page_size {
frame_size = page_size / (page_size / snaplen)
} else {
frame_size = (snaplen/page_size + 1) * page_size
}
// 128 is the default from the gopacket library so just use that
block_size = frame_size * 128
num_blocks = (target_size_mb * 1024 * 1024) / block_size
if num_blocks == 0 {
return 0, 0, 0, fmt.Errorf("Buffer size too small")
}
return frame_size, block_size, num_blocks, nil
}
func deviceNameFromIndex(index int, devices []string) (string, error) {
if index >= len(devices) {
return "", fmt.Errorf("Looking for device index %d, but there are only %d devices",
index, len(devices))
}
return devices[index], nil
}
// ListDevicesNames returns the list of adapters available for sniffing on
// this computer. If the withDescription parameter is set to true, a human
// readable version of the adapter name is added.
func ListDeviceNames(withDescription bool) ([]string, error) {
devices, err := pcap.FindAllDevs()
if err != nil {
return []string{}, err
}
ret := []string{}
for _, dev := range devices {
if withDescription {
desc := "No description available"
if len(dev.Description) > 0 {
desc = dev.Description
}
ret = append(ret, fmt.Sprintf("%s (%s)", dev.Name, desc))
} else {
ret = append(ret, dev.Name)
}
}
return ret, nil
}
func (sniffer *SnifferSetup) setFromConfig(config *config.InterfacesConfig) error {
var err error
sniffer.config = config
if len(sniffer.config.File) > 0 {
logp.Debug("sniffer", "Reading from file: %s", sniffer.config.File)
// we read file with the pcap provider
sniffer.config.Type = "pcap"
}
// set defaults
if len(sniffer.config.Device) == 0 {
sniffer.config.Device = "any"
}
if index, err := strconv.Atoi(sniffer.config.Device); err == nil { // Device is numeric
devices, err := ListDeviceNames(false)
if err != nil {
return fmt.Errorf("Error getting devices list: %v", err)
}
sniffer.config.Device, err = deviceNameFromIndex(index, devices)
if err != nil {
return fmt.Errorf("Couldn't understand device index %d: %v", index, err)
}
logp.Info("Resolved device index %d to device: %s", index, sniffer.config.Device)
}
if sniffer.config.Snaplen == 0 {
sniffer.config.Snaplen = 65535
}
if sniffer.config.Type == "autodetect" || sniffer.config.Type == "" {
sniffer.config.Type = "pcap"
}
logp.Debug("sniffer", "Sniffer type: %s device: %s", sniffer.config.Type, sniffer.config.Device)
switch sniffer.config.Type {
case "pcap":
if len(sniffer.config.File) > 0 {
sniffer.pcapHandle, err = pcap.OpenOffline(sniffer.config.File)
if err != nil {
return err
}
} else {
sniffer.pcapHandle, err = pcap.OpenLive(
sniffer.config.Device,
int32(sniffer.config.Snaplen),
true,
500*time.Millisecond)
if err != nil {
return err
}
err = sniffer.pcapHandle.SetBPFFilter(sniffer.config.Bpf_filter)
if err != nil {
return err
}
}
sniffer.DataSource = gopacket.PacketDataSource(sniffer.pcapHandle)
case "af_packet":
if sniffer.config.Buffer_size_mb == 0 {
sniffer.config.Buffer_size_mb = 24
}
frame_size, block_size, num_blocks, err := afpacketComputeSize(
sniffer.config.Buffer_size_mb,
sniffer.config.Snaplen,
os.Getpagesize())
if err != nil {
return err
}
sniffer.afpacketHandle, err = NewAfpacketHandle(
sniffer.config.Device,
frame_size,
block_size,
num_blocks,
500*time.Millisecond)
if err != nil {
return err
}
err = sniffer.afpacketHandle.SetBPFFilter(sniffer.config.Bpf_filter)
if err != nil {
return fmt.Errorf("SetBPFFilter failed: %s", err)
}
sniffer.DataSource = gopacket.PacketDataSource(sniffer.afpacketHandle)
case "pfring", "pf_ring":
sniffer.pfringHandle, err = NewPfringHandle(
sniffer.config.Device,
sniffer.config.Snaplen,
true)
if err != nil {
return err
}
err = sniffer.pfringHandle.SetBPFFilter(sniffer.config.Bpf_filter)
if err != nil {
return fmt.Errorf("SetBPFFilter failed: %s", err)
}
err = sniffer.pfringHandle.Enable()
if err != nil {
return fmt.Errorf("Enable failed: %s", err)
}
sniffer.DataSource = gopacket.PacketDataSource(sniffer.pfringHandle)
default:
return fmt.Errorf("Unknown sniffer type: %s", sniffer.config.Type)
}
return nil
}
func (sniffer *SnifferSetup) Reopen() error {
var err error
if sniffer.config.Type != "pcap" || sniffer.config.File == "" {
return fmt.Errorf("Reopen is only possible for files")
}
sniffer.pcapHandle.Close()
sniffer.pcapHandle, err = pcap.OpenOffline(sniffer.config.File)
if err != nil {
return err
}
sniffer.DataSource = gopacket.PacketDataSource(sniffer.pcapHandle)
return nil
}
func (sniffer *SnifferSetup) Datalink() layers.LinkType {
if sniffer.config.Type == "pcap" {
return sniffer.pcapHandle.LinkType()
}
return layers.LinkTypeEthernet
}
func (sniffer *SnifferSetup) Init(
test_mode bool,
icmp4 icmp.ICMPv4Processor,
icmp6 icmp.ICMPv6Processor,
tcp tcp.Processor,
udp udp.Processor,
) error {
if config.ConfigSingleton.Interfaces.Bpf_filter == "" {
with_vlans := config.ConfigSingleton.Interfaces.With_vlans
with_icmp := config.ConfigSingleton.Protocols.Icmp.Enabled
config.ConfigSingleton.Interfaces.Bpf_filter = protos.Protos.BpfFilter(with_vlans, with_icmp)
}
logp.Debug("sniffer", "BPF filter: %s", config.ConfigSingleton.Interfaces.Bpf_filter)
var err error
if !test_mode {
err = sniffer.setFromConfig(&config.ConfigSingleton.Interfaces)
if err != nil {
return fmt.Errorf("Error creating sniffer: %v", err)
}
}
sniffer.Decoder, err = decoder.NewDecoder(sniffer.Datalink(), icmp4, icmp6, tcp, udp)
if err != nil {
return fmt.Errorf("Error creating decoder: %v", err)
}
if sniffer.config.Dumpfile != "" {
p, err := pcap.OpenDead(sniffer.Datalink(), 65535)
if err != nil {
return err
}
sniffer.dumper, err = p.NewDumper(sniffer.config.Dumpfile)
if err != nil {
return err
}
}
sniffer.isAlive = true
return nil
}
func (sniffer *SnifferSetup) Run() error {
counter := 0
loopCount := 1
var lastPktTime *time.Time = nil
var ret_error error
for sniffer.isAlive {
if sniffer.config.OneAtATime {
fmt.Println("Press enter to read packet")
fmt.Scanln()
}
data, ci, err := sniffer.DataSource.ReadPacketData()
if err == pcap.NextErrorTimeoutExpired || err == syscall.EINTR {
logp.Debug("sniffer", "Interrupted")
continue
}
if err == io.EOF {
logp.Debug("sniffer", "End of file")
loopCount += 1
if sniffer.config.Loop > 0 && loopCount > sniffer.config.Loop {
// give a bit of time to the publish goroutine
// to flush
time.Sleep(300 * time.Millisecond)
sniffer.isAlive = false
continue
}
logp.Debug("sniffer", "Reopening the file")
err = sniffer.Reopen()
if err != nil {
ret_error = fmt.Errorf("Error reopening file: %s", err)
sniffer.isAlive = false
continue
}
lastPktTime = nil
continue
}
if err != nil {
ret_error = fmt.Errorf("Sniffing error: %s", err)
sniffer.isAlive = false
continue
}
if len(data) == 0 {
// Empty packet, probably timeout from afpacket
continue
}
if sniffer.config.File != "" {
if lastPktTime != nil && !sniffer.config.TopSpeed {
sleep := ci.Timestamp.Sub(*lastPktTime)
if sleep > 0 {
time.Sleep(sleep)
} else {
logp.Warn("Time in pcap went backwards: %d", sleep)
}
}
_lastPktTime := ci.Timestamp
lastPktTime = &_lastPktTime
if !sniffer.config.TopSpeed {
ci.Timestamp = time.Now() // overwrite what we get from the pcap
}
}
counter++
if sniffer.dumper != nil {
sniffer.dumper.WritePacketData(data, ci)
}
logp.Debug("sniffer", "Packet number: %d", counter)
sniffer.Decoder.DecodePacketData(data, &ci)
}
logp.Info("Input finish. Processed %d packets. Have a nice day!", counter)
if sniffer.dumper != nil {
sniffer.dumper.Close()
}
return ret_error
}
func (sniffer *SnifferSetup) Close() error {
switch sniffer.config.Type {
case "pcap":
sniffer.pcapHandle.Close()
case "af_packet":
sniffer.afpacketHandle.Close()
case "pfring", "pf_ring":
sniffer.pfringHandle.Close()
}
return nil
}
func (sniffer *SnifferSetup) Stop() error {
sniffer.isAlive = false
return nil
}
func (sniffer *SnifferSetup) IsAlive() bool {
return sniffer.isAlive
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/zhangjungang/beats.git
git@gitee.com:zhangjungang/beats.git
zhangjungang
beats
beats
v1.1.1

搜索帮助