1 Star 0 Fork 0

lqinggang/psiphon-tunnel-core

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
net.go 8.77 KB
一键复制 编辑 原始数据 按行查看 历史
/*
* Copyright (c) 2016, Psiphon Inc.
* All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package common
import (
"container/list"
"net"
"sync"
"sync/atomic"
"time"
"github.com/Psiphon-Inc/goarista/monotime"
)
// Conns is a synchronized list of Conns that is used to coordinate
// interrupting a set of goroutines establishing connections, or
// close a set of open connections, etc.
// Once the list is closed, no more items may be added to the
// list (unless it is reset).
type Conns struct {
mutex sync.Mutex
isClosed bool
conns map[net.Conn]bool
}
func (conns *Conns) Reset() {
conns.mutex.Lock()
defer conns.mutex.Unlock()
conns.isClosed = false
conns.conns = make(map[net.Conn]bool)
}
func (conns *Conns) Add(conn net.Conn) bool {
conns.mutex.Lock()
defer conns.mutex.Unlock()
if conns.isClosed {
return false
}
if conns.conns == nil {
conns.conns = make(map[net.Conn]bool)
}
conns.conns[conn] = true
return true
}
func (conns *Conns) Remove(conn net.Conn) {
conns.mutex.Lock()
defer conns.mutex.Unlock()
delete(conns.conns, conn)
}
func (conns *Conns) CloseAll() {
conns.mutex.Lock()
defer conns.mutex.Unlock()
conns.isClosed = true
for conn, _ := range conns.conns {
conn.Close()
}
conns.conns = make(map[net.Conn]bool)
}
// IPAddressFromAddr is a helper which extracts an IP address
// from a net.Addr or returns "" if there is no IP address.
func IPAddressFromAddr(addr net.Addr) string {
ipAddress := ""
if addr != nil {
host, _, err := net.SplitHostPort(addr.String())
if err == nil {
ipAddress = host
}
}
return ipAddress
}
// LRUConns is a concurrency-safe list of net.Conns ordered
// by recent activity. Its purpose is to facilitate closing
// the oldest connection in a set of connections.
//
// New connections added are referenced by a LRUConnsEntry,
// which is used to Touch() active connections, which
// promotes them to the front of the order and to Remove()
// connections that are no longer LRU candidates.
//
// CloseOldest() will remove the oldest connection from the
// list and call net.Conn.Close() on the connection.
//
// After an entry has been removed, LRUConnsEntry Touch()
// and Remove() will have no effect.
type LRUConns struct {
mutex sync.Mutex
list *list.List
}
// NewLRUConns initializes a new LRUConns.
func NewLRUConns() *LRUConns {
return &LRUConns{list: list.New()}
}
// Add inserts a net.Conn as the freshest connection
// in a LRUConns and returns an LRUConnsEntry to be
// used to freshen the connection or remove the connection
// from the LRU list.
func (conns *LRUConns) Add(conn net.Conn) *LRUConnsEntry {
conns.mutex.Lock()
defer conns.mutex.Unlock()
return &LRUConnsEntry{
lruConns: conns,
element: conns.list.PushFront(conn),
}
}
// CloseOldest closes the oldest connection in a
// LRUConns. It calls net.Conn.Close() on the
// connection.
func (conns *LRUConns) CloseOldest() {
conns.mutex.Lock()
oldest := conns.list.Back()
if oldest != nil {
conns.list.Remove(oldest)
}
// Release mutex before closing conn
conns.mutex.Unlock()
if oldest != nil {
oldest.Value.(net.Conn).Close()
}
}
// LRUConnsEntry is an entry in a LRUConns list.
type LRUConnsEntry struct {
lruConns *LRUConns
element *list.Element
}
// Remove deletes the connection referenced by the
// LRUConnsEntry from the associated LRUConns.
// Has no effect if the entry was not initialized
// or previously removed.
func (entry *LRUConnsEntry) Remove() {
if entry.lruConns == nil || entry.element == nil {
return
}
entry.lruConns.mutex.Lock()
defer entry.lruConns.mutex.Unlock()
entry.lruConns.list.Remove(entry.element)
}
// Touch promotes the connection referenced by the
// LRUConnsEntry to the front of the associated LRUConns.
// Has no effect if the entry was not initialized
// or previously removed.
func (entry *LRUConnsEntry) Touch() {
if entry.lruConns == nil || entry.element == nil {
return
}
entry.lruConns.mutex.Lock()
defer entry.lruConns.mutex.Unlock()
entry.lruConns.list.MoveToFront(entry.element)
}
// ActivityMonitoredConn wraps a net.Conn, adding logic to deal with
// events triggered by I/O activity.
//
// When an inactivity timeout is specified, the network I/O will
// timeout after the specified period of read inactivity. Optionally,
// for the purpose of inactivity only, ActivityMonitoredConn will also
// consider the connection active when data is written to it.
//
// When a LRUConnsEntry is specified, then the LRU entry is promoted on
// either a successful read or write.
//
// When an ActivityUpdater is set, then its UpdateActivity method is
// called on each read and write with the number of bytes transferred.
// The durationNanoseconds, which is the time since the last read, is
// reported only on reads.
//
type ActivityMonitoredConn struct {
// Note: 64-bit ints used with atomic operations are at placed
// at the start of struct to ensure 64-bit alignment.
// (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
monotonicStartTime int64
lastReadActivityTime int64
realStartTime time.Time
net.Conn
inactivityTimeout time.Duration
activeOnWrite bool
activityUpdater ActivityUpdater
lruEntry *LRUConnsEntry
}
// ActivityUpdater defines an interface for receiving updates for
// ActivityMonitoredConn activity. Values passed to UpdateProgress are
// bytes transferred and conn duration since the previous UpdateProgress.
type ActivityUpdater interface {
UpdateProgress(bytesRead, bytesWritten int64, durationNanoseconds int64)
}
// NewActivityMonitoredConn creates a new ActivityMonitoredConn.
func NewActivityMonitoredConn(
conn net.Conn,
inactivityTimeout time.Duration,
activeOnWrite bool,
activityUpdater ActivityUpdater,
lruEntry *LRUConnsEntry) (*ActivityMonitoredConn, error) {
if inactivityTimeout > 0 {
err := conn.SetDeadline(time.Now().Add(inactivityTimeout))
if err != nil {
return nil, ContextError(err)
}
}
now := int64(monotime.Now())
return &ActivityMonitoredConn{
Conn: conn,
inactivityTimeout: inactivityTimeout,
activeOnWrite: activeOnWrite,
realStartTime: time.Now(),
monotonicStartTime: now,
lastReadActivityTime: now,
activityUpdater: activityUpdater,
lruEntry: lruEntry,
}, nil
}
// GetStartTime gets the time when the ActivityMonitoredConn was
// initialized. Reported time is UTC.
func (conn *ActivityMonitoredConn) GetStartTime() time.Time {
return conn.realStartTime.UTC()
}
// GetActiveDuration returns the time elapsed between the initialization
// of the ActivityMonitoredConn and the last Read. Only reads are used
// for this calculation since writes may succeed locally due to buffering.
func (conn *ActivityMonitoredConn) GetActiveDuration() time.Duration {
return time.Duration(atomic.LoadInt64(&conn.lastReadActivityTime) - conn.monotonicStartTime)
}
// GetLastActivityTime returns the arbitrary monotonic time of the last Read.
func (conn *ActivityMonitoredConn) GetLastActivityMonotime() monotime.Time {
return monotime.Time(atomic.LoadInt64(&conn.lastReadActivityTime))
}
func (conn *ActivityMonitoredConn) Read(buffer []byte) (int, error) {
n, err := conn.Conn.Read(buffer)
if err == nil {
if conn.inactivityTimeout > 0 {
err = conn.Conn.SetDeadline(time.Now().Add(conn.inactivityTimeout))
if err != nil {
return n, ContextError(err)
}
}
readActivityTime := int64(monotime.Now())
if conn.activityUpdater != nil {
conn.activityUpdater.UpdateProgress(
int64(n), 0, readActivityTime-atomic.LoadInt64(&conn.lastReadActivityTime))
}
if conn.lruEntry != nil {
conn.lruEntry.Touch()
}
atomic.StoreInt64(&conn.lastReadActivityTime, readActivityTime)
}
// Note: no context error to preserve error type
return n, err
}
func (conn *ActivityMonitoredConn) Write(buffer []byte) (int, error) {
n, err := conn.Conn.Write(buffer)
if err == nil && conn.activeOnWrite {
if conn.inactivityTimeout > 0 {
err = conn.Conn.SetDeadline(time.Now().Add(conn.inactivityTimeout))
if err != nil {
return n, ContextError(err)
}
}
if conn.activityUpdater != nil {
conn.activityUpdater.UpdateProgress(0, int64(n), 0)
}
if conn.lruEntry != nil {
conn.lruEntry.Touch()
}
}
// Note: no context error to preserve error type
return n, err
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/lqinggang/psiphon-tunnel-core.git
git@gitee.com:lqinggang/psiphon-tunnel-core.git
lqinggang
psiphon-tunnel-core
psiphon-tunnel-core
v0.0.13-alpha

搜索帮助