1 Star 0 Fork 12

hlwqds/nsq

forked from Gitee 极速下载/NSQ 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
nsq_to_file.go 13.82 KB
一键复制 编辑 原始数据 按行查看 历史
Matt Reiferson 提交于 2015-04-12 13:00 . s/bitly/nsqio
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539
// This is a client that writes out to a file, and optionally rolls the file
package main
import (
"compress/gzip"
"errors"
"flag"
"fmt"
"io"
"log"
"os"
"os/signal"
"path"
"path/filepath"
"regexp"
"strings"
"sync"
"syscall"
"time"
"github.com/nsqio/go-nsq"
"github.com/nsqio/nsq/internal/app"
"github.com/nsqio/nsq/internal/clusterinfo"
"github.com/nsqio/nsq/internal/http_api"
"github.com/nsqio/nsq/internal/version"
)
var (
showVersion = flag.Bool("version", false, "print version string")
channel = flag.String("channel", "nsq_to_file", "nsq channel")
maxInFlight = flag.Int("max-in-flight", 200, "max number of messages to allow in flight")
outputDir = flag.String("output-dir", "/tmp", "directory to write output files to")
datetimeFormat = flag.String("datetime-format", "%Y-%m-%d_%H", "strftime compatible format for <DATETIME> in filename format")
filenameFormat = flag.String("filename-format", "<TOPIC>.<HOST><REV>.<DATETIME>.log", "output filename format (<TOPIC>, <HOST>, <PID>, <DATETIME>, <REV> are replaced. <REV> is increased when file already exists)")
hostIdentifier = flag.String("host-identifier", "", "value to output in log filename in place of hostname. <SHORT_HOST> and <HOSTNAME> are valid replacement tokens")
gzipLevel = flag.Int("gzip-level", 6, "gzip compression level (1-9, 1=BestSpeed, 9=BestCompression)")
gzipEnabled = flag.Bool("gzip", false, "gzip output files.")
skipEmptyFiles = flag.Bool("skip-empty-files", false, "Skip writing empty files")
topicPollRate = flag.Duration("topic-refresh", time.Minute, "how frequently the topic list should be refreshed")
topicPattern = flag.String("topic-pattern", ".*", "Only log topics matching the following pattern")
rotateSize = flag.Int64("rotate-size", 0, "rotate the file when it grows bigger than `rotate-size` bytes")
rotateInterval = flag.Duration("rotate-interval", 0*time.Second, "rotate the file every duration")
nsqdTCPAddrs = app.StringArray{}
lookupdHTTPAddrs = app.StringArray{}
topics = app.StringArray{}
// TODO: remove, deprecated
gzipCompression = flag.Int("gzip-compression", 3, "(deprecated) use --gzip-level, gzip compression level (1 = BestSpeed, 2 = BestCompression, 3 = DefaultCompression)")
)
func init() {
flag.Var(&nsqdTCPAddrs, "nsqd-tcp-address", "nsqd TCP address (may be given multiple times)")
flag.Var(&lookupdHTTPAddrs, "lookupd-http-address", "lookupd HTTP address (may be given multiple times)")
flag.Var(&topics, "topic", "nsq topic (may be given multiple times)")
}
type FileLogger struct {
out *os.File
writer io.Writer
gzipWriter *gzip.Writer
logChan chan *nsq.Message
compressionLevel int
gzipEnabled bool
filenameFormat string
ExitChan chan int
termChan chan bool
hupChan chan bool
// for rotation
lastFilename string
lastOpenTime time.Time
filesize int64
rev uint
}
type ConsumerFileLogger struct {
F *FileLogger
C *nsq.Consumer
}
type TopicDiscoverer struct {
topics map[string]*ConsumerFileLogger
termChan chan os.Signal
hupChan chan os.Signal
wg sync.WaitGroup
cfg *nsq.Config
}
func newTopicDiscoverer(cfg *nsq.Config) *TopicDiscoverer {
return &TopicDiscoverer{
topics: make(map[string]*ConsumerFileLogger),
termChan: make(chan os.Signal),
hupChan: make(chan os.Signal),
cfg: cfg,
}
}
func (f *FileLogger) HandleMessage(m *nsq.Message) error {
m.DisableAutoResponse()
f.logChan <- m
return nil
}
func (f *FileLogger) router(r *nsq.Consumer) {
pos := 0
output := make([]*nsq.Message, *maxInFlight)
sync := false
ticker := time.NewTicker(time.Duration(30) * time.Second)
closing := false
closeFile := false
exit := false
for {
select {
case <-r.StopChan:
sync = true
closeFile = true
exit = true
case <-f.termChan:
ticker.Stop()
r.Stop()
sync = true
closing = true
case <-f.hupChan:
sync = true
closeFile = true
case <-ticker.C:
if f.needsFileRotate() {
if *skipEmptyFiles {
closeFile = true
} else {
f.updateFile()
}
}
sync = true
case m := <-f.logChan:
if f.needsFileRotate() {
f.updateFile()
sync = true
}
_, err := f.writer.Write(m.Body)
if err != nil {
log.Fatalf("ERROR: writing message to disk - %s", err)
}
_, err = f.writer.Write([]byte("\n"))
if err != nil {
log.Fatalf("ERROR: writing newline to disk - %s", err)
}
output[pos] = m
pos++
if pos == cap(output) {
sync = true
}
}
if closing || sync || r.IsStarved() {
if pos > 0 {
log.Printf("syncing %d records to disk", pos)
err := f.Sync()
if err != nil {
log.Fatalf("ERROR: failed syncing messages - %s", err)
}
for pos > 0 {
pos--
m := output[pos]
m.Finish()
output[pos] = nil
}
}
sync = false
}
if closeFile {
f.Close()
closeFile = false
}
if exit {
close(f.ExitChan)
break
}
}
}
func (f *FileLogger) Close() {
if f.out != nil {
f.out.Sync()
if f.gzipWriter != nil {
f.gzipWriter.Close()
}
f.out.Close()
f.out = nil
}
}
func (f *FileLogger) Write(p []byte) (n int, err error) {
f.filesize += int64(len(p))
return f.out.Write(p)
}
func (f *FileLogger) Sync() error {
var err error
if f.gzipWriter != nil {
f.gzipWriter.Close()
err = f.out.Sync()
f.gzipWriter, _ = gzip.NewWriterLevel(f, f.compressionLevel)
f.writer = f.gzipWriter
} else {
err = f.out.Sync()
}
return err
}
func (f *FileLogger) calculateCurrentFilename() string {
t := time.Now()
datetime := strftime(*datetimeFormat, t)
return strings.Replace(f.filenameFormat, "<DATETIME>", datetime, -1)
}
func (f *FileLogger) needsFileRotate() bool {
if f.out == nil {
return true
}
filename := f.calculateCurrentFilename()
if filename != f.lastFilename {
log.Printf("INFO: new filename %s, need rotate", filename)
return true // rotate by filename
}
if *rotateInterval > 0 {
if s := time.Since(f.lastOpenTime); s > *rotateInterval {
log.Printf("INFO: %s since last open, need rotate", s)
return true // rotate by interval
}
}
if *rotateSize > 0 && f.filesize > *rotateSize {
log.Printf("INFO: %s current %d bytes, need rotate", f.out.Name(), f.filesize)
return true // rotate by size
}
return false
}
func (f *FileLogger) updateFile() {
filename := f.calculateCurrentFilename()
if filename != f.lastFilename {
f.rev = 0 // reset revsion to 0 if it is a new filename
} else {
f.rev++
}
f.lastFilename = filename
f.lastOpenTime = time.Now()
fullPath := path.Join(*outputDir, filename)
dir, _ := filepath.Split(fullPath)
if dir != "" {
err := os.MkdirAll(dir, 0770)
if err != nil {
log.Fatalf("ERROR: %s Unable to create %s", err, dir)
}
}
f.Close()
var err error
var fi os.FileInfo
for ; ; f.rev++ {
absFilename := strings.Replace(fullPath, "<REV>", fmt.Sprintf("-%06d", f.rev), -1)
openFlag := os.O_WRONLY | os.O_CREATE
if f.gzipEnabled {
openFlag |= os.O_EXCL
} else {
openFlag |= os.O_APPEND
}
f.out, err = os.OpenFile(absFilename, openFlag, 0666)
if err != nil {
if os.IsExist(err) {
log.Printf("INFO: file already exists: %s", absFilename)
continue
}
log.Fatalf("ERROR: %s Unable to open %s", err, absFilename)
}
log.Printf("INFO: opening %s", absFilename)
fi, err = f.out.Stat()
if err != nil {
log.Fatalf("ERROR: %s Unable to stat file %s", err, f.out.Name())
}
f.filesize = fi.Size()
if f.filesize == 0 {
break // ok, new file
}
if f.needsFileRotate() {
continue // next rev
}
break // ok, don't need rotate
}
if f.gzipEnabled {
f.gzipWriter, _ = gzip.NewWriterLevel(f, f.compressionLevel)
f.writer = f.gzipWriter
} else {
f.writer = f
}
}
func NewFileLogger(gzipEnabled bool, compressionLevel int, filenameFormat, topic string) (*FileLogger, error) {
// TODO: remove, deprecated, for compat <GZIPREV>
filenameFormat = strings.Replace(filenameFormat, "<GZIPREV>", "<REV>", -1)
if gzipEnabled || *rotateSize > 0 || *rotateInterval > 0 {
if strings.Index(filenameFormat, "<REV>") == -1 {
return nil, errors.New("missing <REV> in --filename-format when gzip or rotation enabled")
}
} else { // remove <REV> as we don't need it
filenameFormat = strings.Replace(filenameFormat, "<REV>", "", -1)
}
hostname, err := os.Hostname()
if err != nil {
return nil, err
}
shortHostname := strings.Split(hostname, ".")[0]
identifier := shortHostname
if len(*hostIdentifier) != 0 {
identifier = strings.Replace(*hostIdentifier, "<SHORT_HOST>", shortHostname, -1)
identifier = strings.Replace(identifier, "<HOSTNAME>", hostname, -1)
}
filenameFormat = strings.Replace(filenameFormat, "<TOPIC>", topic, -1)
filenameFormat = strings.Replace(filenameFormat, "<HOST>", identifier, -1)
filenameFormat = strings.Replace(filenameFormat, "<PID>", fmt.Sprintf("%d", os.Getpid()), -1)
if gzipEnabled && !strings.HasSuffix(filenameFormat, ".gz") {
filenameFormat = filenameFormat + ".gz"
}
f := &FileLogger{
logChan: make(chan *nsq.Message, 1),
compressionLevel: compressionLevel,
filenameFormat: filenameFormat,
gzipEnabled: gzipEnabled,
ExitChan: make(chan int),
termChan: make(chan bool),
hupChan: make(chan bool),
}
return f, nil
}
func hasArg(s string) bool {
for _, arg := range os.Args {
if strings.Contains(arg, s) {
return true
}
}
return false
}
func newConsumerFileLogger(topic string, cfg *nsq.Config) (*ConsumerFileLogger, error) {
f, err := NewFileLogger(*gzipEnabled, *gzipLevel, *filenameFormat, topic)
if err != nil {
return nil, err
}
consumer, err := nsq.NewConsumer(topic, *channel, cfg)
if err != nil {
return nil, err
}
consumer.AddHandler(f)
err = consumer.ConnectToNSQDs(nsqdTCPAddrs)
if err != nil {
log.Fatal(err)
}
err = consumer.ConnectToNSQLookupds(lookupdHTTPAddrs)
if err != nil {
log.Fatal(err)
}
return &ConsumerFileLogger{
C: consumer,
F: f,
}, nil
}
func (t *TopicDiscoverer) startTopicRouter(logger *ConsumerFileLogger) {
t.wg.Add(1)
defer t.wg.Done()
go logger.F.router(logger.C)
<-logger.F.ExitChan
}
func (t *TopicDiscoverer) allowTopicName(pattern string, name string) bool {
match, err := regexp.MatchString(pattern, name)
if err != nil {
return false
}
return match
}
func (t *TopicDiscoverer) syncTopics(addrs []string, pattern string) {
newTopics, err := clusterinfo.New(nil, http_api.NewClient(nil)).GetLookupdTopics(addrs)
if err != nil {
log.Printf("ERROR: could not retrieve topic list: %s", err)
}
for _, topic := range newTopics {
if _, ok := t.topics[topic]; !ok {
if !t.allowTopicName(pattern, topic) {
log.Println("Skipping topic ", topic, "as it didn't match required pattern:", pattern)
continue
}
logger, err := newConsumerFileLogger(topic, t.cfg)
if err != nil {
log.Printf("ERROR: couldn't create logger for new topic %s: %s", topic, err)
continue
}
t.topics[topic] = logger
go t.startTopicRouter(logger)
}
}
}
func (t *TopicDiscoverer) stop() {
for _, topic := range t.topics {
topic.F.termChan <- true
}
}
func (t *TopicDiscoverer) hup() {
for _, topic := range t.topics {
topic.F.hupChan <- true
}
}
func (t *TopicDiscoverer) watch(addrs []string, sync bool, pattern string) {
ticker := time.Tick(*topicPollRate)
for {
select {
case <-ticker:
if sync {
t.syncTopics(addrs, pattern)
}
case <-t.termChan:
t.stop()
t.wg.Wait()
return
case <-t.hupChan:
t.hup()
}
}
}
func main() {
cfg := nsq.NewConfig()
// TODO: remove, deprecated
flag.Var(&nsq.ConfigFlag{cfg}, "reader-opt", "(deprecated) use --consumer-opt")
flag.Var(&nsq.ConfigFlag{cfg}, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/nsqio/go-nsq#Config)")
flag.Parse()
if *showVersion {
fmt.Printf("nsq_to_file v%s\n", version.Binary)
return
}
if *channel == "" {
log.Fatal("--channel is required")
}
var topicsFromNSQLookupd bool
if len(nsqdTCPAddrs) == 0 && len(lookupdHTTPAddrs) == 0 {
log.Fatal("--nsqd-tcp-address or --lookupd-http-address required.")
}
if len(nsqdTCPAddrs) != 0 && len(lookupdHTTPAddrs) != 0 {
log.Fatal("use --nsqd-tcp-address or --lookupd-http-address not both")
}
if *gzipLevel < 1 || *gzipLevel > 9 {
log.Fatalf("invalid --gzip-level value (%d), should be 1-9", *gzipLevel)
}
// TODO: remove, deprecated
if hasArg("gzip-compression") {
log.Printf("WARNING: --gzip-compression is deprecated in favor of --gzip-level")
switch *gzipCompression {
case 1:
*gzipLevel = gzip.BestSpeed
case 2:
*gzipLevel = gzip.BestCompression
case 3:
*gzipLevel = gzip.DefaultCompression
default:
log.Fatalf("invalid --gzip-compression value (%d), should be 1,2,3", *gzipCompression)
}
}
cfg.UserAgent = fmt.Sprintf("nsq_to_file/%s go-nsq/%s", version.Binary, nsq.VERSION)
cfg.MaxInFlight = *maxInFlight
discoverer := newTopicDiscoverer(cfg)
signal.Notify(discoverer.hupChan, syscall.SIGHUP)
signal.Notify(discoverer.termChan, syscall.SIGINT, syscall.SIGTERM)
if len(topics) < 1 {
if len(lookupdHTTPAddrs) < 1 {
log.Fatal("use --topic to list at least one topic to subscribe to or specify at least one --lookupd-http-address to subscribe to all its topics")
}
topicsFromNSQLookupd = true
var err error
topics, err = clusterinfo.New(nil, http_api.NewClient(nil)).GetLookupdTopics(lookupdHTTPAddrs)
if err != nil {
log.Fatalf("ERROR: could not retrieve topic list: %s", err)
}
}
for _, topic := range topics {
if !discoverer.allowTopicName(*topicPattern, topic) {
log.Println("Skipping topic", topic, "as it didn't match required pattern:", *topicPattern)
continue
}
logger, err := newConsumerFileLogger(topic, cfg)
if err != nil {
log.Fatalf("ERROR: couldn't create logger for topic %s: %s", topic, err)
}
discoverer.topics[topic] = logger
go discoverer.startTopicRouter(logger)
}
discoverer.watch(lookupdHTTPAddrs, topicsFromNSQLookupd, *topicPattern)
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/hlwqds/nsq.git
git@gitee.com:hlwqds/nsq.git
hlwqds
nsq
nsq
v0.3.6

搜索帮助