1 Star 1 Fork 0

bigbase/pg

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
listener.go 4.05 KB
一键复制 编辑 原始数据 按行查看 历史
Vladimir Mihailenco 提交于 2017-05-08 15:38 . Fix channel name quotting
package pg
import (
"sync"
"time"
"github.com/go-pg/pg/internal"
"github.com/go-pg/pg/internal/pool"
"github.com/go-pg/pg/types"
)
// A notification received with LISTEN command.
type Notification struct {
Channel string
Payload string
}
// Listener listens for notifications sent with NOTIFY command.
// It's NOT safe for concurrent use by multiple goroutines
// except the Channel API.
type Listener struct {
db *DB
channels []string
mu sync.Mutex
cn *pool.Conn
closed bool
}
func (ln *Listener) conn(readTimeout time.Duration) (*pool.Conn, error) {
ln.mu.Lock()
cn, err := ln._conn(readTimeout)
ln.mu.Unlock()
if err != nil {
return nil, err
}
cn.SetReadWriteTimeout(readTimeout, ln.db.opt.WriteTimeout)
return cn, nil
}
func (ln *Listener) _conn(readTimeout time.Duration) (*pool.Conn, error) {
if ln.closed {
return nil, errListenerClosed
}
if ln.cn != nil {
return ln.cn, nil
}
cn, err := ln.db.pool.NewConn()
if err != nil {
return nil, err
}
if cn.InitedAt.IsZero() {
if err := ln.db.initConn(cn); err != nil {
_ = ln.db.pool.CloseConn(cn)
return nil, err
}
cn.InitedAt = time.Now()
}
if len(ln.channels) > 0 {
if err := ln.listen(cn, ln.channels...); err != nil {
_ = ln.db.pool.CloseConn(cn)
return nil, err
}
}
ln.cn = cn
return cn, nil
}
func (ln *Listener) freeConn(cn *pool.Conn, err error) {
if !isBadConn(err, true) {
return
}
ln.mu.Lock()
if cn == ln.cn {
_ = ln.closeConn(err)
}
ln.mu.Unlock()
}
func (ln *Listener) closeConn(reason error) error {
if !ln.closed {
internal.Logf("pg: discarding bad listener connection: %s", reason)
}
err := ln.db.pool.CloseConn(ln.cn)
ln.cn = nil
return err
}
// Close closes the listener, releasing any open resources.
func (ln *Listener) Close() error {
ln.mu.Lock()
defer ln.mu.Unlock()
if ln.closed {
return errListenerClosed
}
ln.closed = true
if ln.cn != nil {
return ln.closeConn(errListenerClosed)
}
return nil
}
// Channel returns a channel for concurrently receiving notifications.
// The channel is closed with Listener.
func (ln *Listener) Channel() <-chan *Notification {
ch := make(chan *Notification, 100)
go func() {
for {
channel, payload, err := ln.ReceiveTimeout(5 * time.Second)
if err != nil {
if err == errListenerClosed {
break
}
continue
}
ch <- &Notification{channel, payload}
}
close(ch)
}()
return ch
}
// Listen starts listening for notifications on channels.
func (ln *Listener) Listen(channels ...string) error {
cn, err := ln.conn(ln.db.opt.ReadTimeout)
if err != nil {
return err
}
if err := ln.listen(cn, channels...); err != nil {
if err != nil {
ln.freeConn(cn, err)
}
return err
}
ln.channels = appendIfNotExists(ln.channels, channels...)
return nil
}
func (ln *Listener) listen(cn *pool.Conn, channels ...string) error {
for _, channel := range channels {
if err := writeQueryMsg(cn.Wr, ln.db, "LISTEN ?", pgChan(channel)); err != nil {
return err
}
}
return cn.FlushWriter()
}
// Receive indefinitely waits for a notification.
func (ln *Listener) Receive() (channel string, payload string, err error) {
return ln.ReceiveTimeout(0)
}
// ReceiveTimeout waits for a notification until timeout is reached.
func (ln *Listener) ReceiveTimeout(timeout time.Duration) (channel, payload string, err error) {
cn, err := ln.conn(timeout)
if err != nil {
return "", "", err
}
channel, payload, err = readNotification(cn)
if err != nil {
ln.freeConn(cn, err)
return "", "", err
}
return channel, payload, nil
}
func appendIfNotExists(ss []string, es ...string) []string {
loop:
for _, e := range es {
for _, s := range ss {
if s == e {
continue loop
}
}
ss = append(ss, e)
}
return ss
}
type pgChan string
var _ types.ValueAppender = pgChan("")
func (ch pgChan) AppendValue(b []byte, quote int) ([]byte, error) {
if quote == 0 {
return append(b, ch...), nil
}
b = append(b, '"')
for _, c := range []byte(ch) {
if c == '"' {
b = append(b, '"', '"')
} else {
b = append(b, c)
}
}
b = append(b, '"')
return b, nil
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/bigbase/pg.git
git@gitee.com:bigbase/pg.git
bigbase
pg
pg
v6.4.5

搜索帮助