1 Star 0 Fork 0

qw_1215/glink

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
pg.go 605 Bytes
一键复制 编辑 原始数据 按行查看 历史
qw_1215 提交于 2020-03-24 14:40 +08:00 . 添加pg支持,修改nsq的bug
package sink
import (
"database/sql"
_ "github.com/lib/pq"
)
type PgSink struct {
db *sql.DB
in chan interface{}
}
func NewPgSink(connStr string) (*PgSink,error) {
var err error
var db *sql.DB
db, err = sql.Open("postgres", connStr)
if err != nil {
return nil, err
}
pg := &PgSink{
db: db,
in: make(chan interface{}),
}
go pg.init()
return pg,nil
}
func (pg *PgSink) init() {
for msg := range pg.in {
switch m := msg.(type) {
case string:
_, _ = pg.db.Exec(m)
}
}
}
// In returns channel for receiving data
func (pg *PgSink) In() chan<- interface{} {
return pg.in
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/qw_1215/glink.git
git@gitee.com:qw_1215/glink.git
qw_1215
glink
glink
195e12e86392

搜索帮助