1 Star 1 Fork 0

bigbase/pg

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
listener.go 2.24 KB
一键复制 编辑 原始数据 按行查看 历史
Vladimir Mihailenco 提交于 2016-03-20 10:19 . Replace connection pool.
package pg
import (
"log"
"sync"
"time"
"gopkg.in/pg.v4/internal/pool"
)
// Not thread-safe.
type Listener struct {
channels []string
db *DB
_cn *pool.Conn
closed bool
mx sync.Mutex
}
func (l *Listener) conn(readTimeout time.Duration) (*pool.Conn, error) {
defer l.mx.Unlock()
l.mx.Lock()
if l.closed {
return nil, errListenerClosed
}
if l._cn == nil {
cn, err := l.db.conn()
if err != nil {
return nil, err
}
l._cn = cn
if len(l.channels) > 0 {
if err := l.listen(cn, l.channels...); err != nil {
return nil, err
}
}
}
l._cn.SetReadTimeout(readTimeout)
l._cn.SetWriteTimeout(l.db.opt.WriteTimeout)
return l._cn, nil
}
func (l *Listener) Listen(channels ...string) error {
cn, err := l.conn(l.db.opt.ReadTimeout)
if err != nil {
return err
}
if err := l.listen(cn, channels...); err != nil {
if err != nil {
l.freeConn(err)
}
return err
}
l.channels = append(l.channels, channels...)
return nil
}
func (l *Listener) listen(cn *pool.Conn, channels ...string) error {
for _, channel := range channels {
if err := writeQueryMsg(cn.Wr, "LISTEN ?", F(channel)); err != nil {
return err
}
}
return cn.Wr.Flush()
}
func (l *Listener) Receive() (channel string, payload string, err error) {
return l.ReceiveTimeout(0)
}
func (l *Listener) ReceiveTimeout(readTimeout time.Duration) (channel, payload string, err error) {
channel, payload, err = l.receiveTimeout(readTimeout)
if err != nil {
l.freeConn(err)
}
return channel, payload, err
}
func (l *Listener) receiveTimeout(readTimeout time.Duration) (channel, payload string, err error) {
cn, err := l.conn(readTimeout)
if err != nil {
return "", "", err
}
return readNotification(cn)
}
func (l *Listener) freeConn(err error) (retErr error) {
if !isBadConn(err, true) {
return nil
}
log.Printf("pg: discarding bad listener connection: %s", err)
return l.closeConn(err)
}
func (l *Listener) closeConn(err error) (retErr error) {
l.mx.Lock()
if l._cn != nil {
retErr = l.db.pool.Remove(l._cn, err)
l._cn = nil
}
l.mx.Unlock()
return retErr
}
func (l *Listener) Close() error {
l.mx.Lock()
closed := l.closed
l.closed = true
l.mx.Unlock()
if closed {
return errListenerClosed
}
return l.closeConn(errListenerClosed)
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/bigbase/pg.git
git@gitee.com:bigbase/pg.git
bigbase
pg
pg
v4.0.13

搜索帮助