14 Star 53 Fork 30

NtripShare/NtripShareBase

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
WatsonTcpServer.cs 78.28 KB
一键复制 编辑 原始数据 按行查看 历史
NtripShare 提交于 4年前 . 初始版本
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Net;
using System.Net.Security;
using System.Net.Sockets;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace WatsonTcp
{
/// <summary>
/// Watson TCP server, with or without SSL.
/// </summary>
public class WatsonTcpServer : IDisposable
{
#region Public-Members
/// <summary>
/// Buffer size to use when reading input and output streams. Default is 65536.
/// </summary>
public int StreamBufferSize
{
get
{
return _StreamBufferSize;
}
set
{
if (value < 1) throw new ArgumentException("Read stream buffer size must be greater than zero.");
_StreamBufferSize = value;
}
}
/// <summary>
/// Maximum amount of time to wait before considering a client idle and disconnecting them.
/// By default, this value is set to 0, which will never disconnect a client due to inactivity.
/// The timeout is reset any time a message is received from a client or a message is sent to a client.
/// For instance, if you set this value to 30, the client will be disconnected if the server has not received a message from the client within 30 seconds or if a message has not been sent to the client in 30 seconds.
/// </summary>
public int IdleClientTimeoutSeconds
{
get
{
return _IdleClientTimeoutSeconds;
}
set
{
if (value < 0) throw new ArgumentException("IdleClientTimeoutSeconds must be zero or greater.");
_IdleClientTimeoutSeconds = value;
}
}
/// <summary>
/// Specify the maximum number of connections the server will accept.
/// </summary>
public int MaxConnections
{
get
{
return _MaxConnections;
}
set
{
if (value < 1) throw new ArgumentException("Max connections must be greater than zero.");
_MaxConnections = value;
}
}
/// <summary>
/// Retrieve the number of current connected clients.
/// </summary>
public int Connections
{
get
{
return _Connections;
}
}
/// <summary>
/// Flag to indicate if Watson TCP is listening for incoming TCP connections.
/// </summary>
public bool IsListening
{
get
{
return _IsListening;
}
}
/// <summary>
/// Enable or disable message debugging. Requires `Logger` to be set.
/// WARNING: Setting this value to true will emit a large number of log messages with a large amount of data.
/// </summary>
public bool DebugMessages = false;
/// <summary>
/// Permitted IP addresses.
/// </summary>
public List<string> PermittedIPs = new List<string>();
/// <summary>
/// Event to fire when authentication is requested from a client.
/// </summary>
public event EventHandler<AuthenticationRequestedEventArgs> AuthenticationRequested;
/// <summary>
/// Event to fire when a client successfully authenticates.
/// </summary>
public event EventHandler<AuthenticationSucceededEventArgs> AuthenticationSucceeded;
/// <summary>
/// Event to fire when a client fails authentication.
/// </summary>
public event EventHandler<AuthenticationFailedEventArgs> AuthenticationFailed;
/// <summary>
/// Event to fire when a client connects to the server.
/// The IP:port of the client is passed in the arguments.
/// </summary>
public event EventHandler<ClientConnectedEventArgs> ClientConnected;
/// <summary>
/// Event to fire when a client disconnects from the server.
/// The IP:port is passed in the arguments along with the reason for the disconnection.
/// </summary>
public event EventHandler<ClientDisconnectedEventArgs> ClientDisconnected;
/// <summary>
/// Use of 'MessageReceived' is exclusive and cannot be used with 'StreamReceived'.
/// This event is fired when a message is received from a client and it is desired that WatsonTcp pass the byte array containing the message payload.
/// </summary>
public event EventHandler<MessageReceivedFromClientEventArgs> MessageReceived
{
add
{
if (_StreamReceived != null
&& _StreamReceived.GetInvocationList().Length > 0)
throw new InvalidOperationException("Only one of 'MessageReceived' and 'StreamReceived' can be set.");
_MessageReceived += value;
}
remove
{
_MessageReceived -= value;
}
}
/// <summary>
/// Use of 'StreamReceived' is exclusive and cannot be used with 'StreamReceived'.
/// This event is fired when a stream is received from a client and it is desired that WatsonTcp pass the stream containing the message payload to your application.
/// </summary>
public event EventHandler<StreamReceivedFromClientEventArgs> StreamReceived
{
add
{
if (_MessageReceived != null
&& _MessageReceived.GetInvocationList().Length > 0)
throw new InvalidOperationException("Only one of 'MessageReceived' and 'StreamReceived' can be set.");
_StreamReceived += value;
}
remove
{
_StreamReceived -= value;
}
}
/// <summary>
/// Callback to invoke when receiving a synchronous request that demands a response.
/// </summary>
public Func<SyncRequest, SyncResponse> SyncRequestReceived
{
get
{
return _SyncRequestReceived;
}
set
{
_SyncRequestReceived = value;
}
}
/// <summary>
/// Enable acceptance of SSL certificates from clients that cannot be validated.
/// </summary>
public bool AcceptInvalidCertificates = true;
/// <summary>
/// Require mutual authentication between SSL clients and this server.
/// </summary>
public bool MutuallyAuthenticate = false;
/// <summary>
/// Preshared key that must be consistent between clients and this server.
/// </summary>
public string PresharedKey = null;
/// <summary>
/// Type of compression to apply on sent messages.
/// </summary>
public CompressionType Compression = CompressionType.None;
/// <summary>
/// Method to invoke when sending a log message.
/// </summary>
public Action<string> Logger = null;
/// <summary>
/// Access Watson TCP statistics.
/// </summary>
public Statistics Stats
{
get
{
return _Stats;
}
}
#endregion
#region Private-Members
private int _StreamBufferSize = 65536;
private int _MaxConnections = 4096;
private int _Connections = 0;
private bool _IsListening = false;
private int _IdleClientTimeoutSeconds = 0;
private Mode _Mode;
private string _ListenerIp;
private int _ListenerPort;
private IPAddress _ListenerIpAddress;
private TcpListener _Listener;
private X509Certificate2 _SslCertificate;
private ConcurrentDictionary<string, DateTime> _UnauthenticatedClients = new ConcurrentDictionary<string, DateTime>();
private ConcurrentDictionary<string, ClientMetadata> _Clients = new ConcurrentDictionary<string, ClientMetadata>();
private ConcurrentDictionary<string, DateTime> _ClientsLastSeen = new ConcurrentDictionary<string, DateTime>();
private ConcurrentDictionary<string, DateTime> _ClientsKicked = new ConcurrentDictionary<string, DateTime>();
private ConcurrentDictionary<string, DateTime> _ClientsTimedout = new ConcurrentDictionary<string, DateTime>();
private CancellationTokenSource _TokenSource = new CancellationTokenSource();
private CancellationToken _Token;
private event EventHandler<MessageReceivedFromClientEventArgs> _MessageReceived;
private event EventHandler<StreamReceivedFromClientEventArgs> _StreamReceived;
private Func<SyncRequest, SyncResponse> _SyncRequestReceived = null;
private readonly object _SyncResponseLock = new object();
private Dictionary<string, SyncResponse> _SyncResponses = new Dictionary<string, SyncResponse>();
private readonly object _ClientsSendLock = new object();
private List<string> _ClientsSending = new List<string>();
private readonly object _ClientsReceiveLock = new object();
private List<string> _ClientsReceiving = new List<string>();
private Statistics _Stats = new Statistics();
#endregion
#region Constructors-and-Factories
/// <summary>
/// Initialize the Watson TCP server without SSL.
/// Supply a specific IP address on which to listen. Otherwise, use 'null' for the IP address to listen on any IP address.
/// If you do not specify an IP address, you may have to run WatsonTcp with administrative privileges.
/// Call Start() afterward to start the server.
/// </summary>
/// <param name="listenerIp">The IP address on which the server should listen. If null, listen on any IP address (may require administrative privileges).</param>
/// <param name="listenerPort">The TCP port on which the server should listen.</param>
public WatsonTcpServer(
string listenerIp,
int listenerPort)
{
if (listenerPort < 1) throw new ArgumentOutOfRangeException(nameof(listenerPort));
_Mode = Mode.Tcp;
if (String.IsNullOrEmpty(listenerIp))
{
_ListenerIpAddress = IPAddress.Any;
_ListenerIp = _ListenerIpAddress.ToString();
}
else
{
_ListenerIpAddress = IPAddress.Parse(listenerIp);
_ListenerIp = listenerIp;
}
_ListenerPort = listenerPort;
_Listener = new TcpListener(_ListenerIpAddress, _ListenerPort);
_Token = _TokenSource.Token;
Task.Run(() => MonitorForIdleClients(), _Token);
Task.Run(() => MonitorForExpiredSyncResponses(), _Token);
}
/// <summary>
/// Initialize the Watson TCP server with SSL.
/// Supply a specific IP address on which to listen. Otherwise, use 'null' for the IP address to listen on any IP address.
/// If you do not specify an IP address, you may have to run WatsonTcp with administrative privileges.
/// Call Start() afterward to start the server.
/// </summary>
/// <param name="listenerIp">The IP address on which the server should listen. If null, listen on any IP address (may require administrative privileges).</param>
/// <param name="listenerPort">The TCP port on which the server should listen.</param>
/// <param name="pfxCertFile">The file containing the SSL certificate.</param>
/// <param name="pfxCertPass">The password for the SSL certificate.</param>
public WatsonTcpServer(
string listenerIp,
int listenerPort,
string pfxCertFile,
string pfxCertPass)
{
if (listenerPort < 1) throw new ArgumentOutOfRangeException(nameof(listenerPort));
if (String.IsNullOrEmpty(pfxCertFile)) throw new ArgumentNullException(nameof(pfxCertFile));
_Mode = Mode.Ssl;
if (String.IsNullOrEmpty(listenerIp))
{
_ListenerIpAddress = IPAddress.Any;
_ListenerIp = _ListenerIpAddress.ToString();
}
else
{
_ListenerIpAddress = IPAddress.Parse(listenerIp);
_ListenerIp = listenerIp;
}
_SslCertificate = null;
if (String.IsNullOrEmpty(pfxCertPass))
{
_SslCertificate = new X509Certificate2(pfxCertFile);
}
else
{
_SslCertificate = new X509Certificate2(pfxCertFile, pfxCertPass);
}
_ListenerPort = listenerPort;
_Listener = new TcpListener(_ListenerIpAddress, _ListenerPort);
_Token = _TokenSource.Token;
Task.Run(() => MonitorForIdleClients(), _Token);
Task.Run(() => MonitorForExpiredSyncResponses(), _Token);
}
/// <summary>
/// Initialize the Watson TCP server with SSL.
/// Supply a specific IP address on which to listen. Otherwise, use 'null' for the IP address to listen on any IP address.
/// If you do not specify an IP address, you may have to run WatsonTcp with administrative privileges.
/// Call Start() afterward to start the server.
/// </summary>
/// <param name="listenerIp">The IP address on which the server should listen. If null, listen on any IP address (may require administrative privileges).</param>
/// <param name="listenerPort">The TCP port on which the server should listen.</param>
/// <param name="cert">The SSL certificate.</param>
/// <exception cref="ArgumentOutOfRangeException"></exception>
public WatsonTcpServer(
string listenerIp,
int listenerPort,
X509Certificate2 cert)
{
if (listenerPort < 1) throw new ArgumentOutOfRangeException(nameof(listenerPort));
if (cert == null) throw new ArgumentNullException(nameof(cert));
_Mode = Mode.Ssl;
_SslCertificate = cert;
if (String.IsNullOrEmpty(listenerIp))
{
_ListenerIpAddress = IPAddress.Any;
_ListenerIp = _ListenerIpAddress.ToString();
}
else
{
_ListenerIpAddress = IPAddress.Parse(listenerIp);
_ListenerIp = listenerIp;
}
_ListenerPort = listenerPort;
_Listener = new TcpListener(_ListenerIpAddress, _ListenerPort);
_Token = _TokenSource.Token;
Task.Run(() => MonitorForIdleClients(), _Token);
Task.Run(() => MonitorForExpiredSyncResponses(), _Token);
}
#endregion
#region Public-Methods
/// <summary>
/// Tear down the server and dispose of background workers.
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// Start the server.
/// </summary>
public void Start()
{
_Stats = new Statistics();
if (_StreamReceived == null && _MessageReceived == null)
{
throw new InvalidOperationException("Either 'MessageReceived' or 'StreamReceived' must first be set.");
}
if (_Mode == Mode.Tcp)
{
Logger?.Invoke("[WatsonTcpServer] Starting on " + _ListenerIp + ":" + _ListenerPort);
}
else if (_Mode == Mode.Ssl)
{
Logger?.Invoke("[WatsonTcpServer] Starting with SSL on " + _ListenerIp + ":" + _ListenerPort);
}
else
{
throw new ArgumentException("Unknown mode: " + _Mode.ToString());
}
Task.Run(() => AcceptConnections(), _Token);
}
/// <summary>
/// Start the server.
/// </summary>
public Task StartAsync()
{
_Stats = new Statistics();
if (_StreamReceived == null && _MessageReceived == null)
{
throw new InvalidOperationException("Either 'MessageReceived' or 'StreamReceived' must first be set.");
}
if (_Mode == Mode.Tcp)
{
Logger?.Invoke("[WatsonTcpServer] Starting on " + _ListenerIp + ":" + _ListenerPort);
}
else if (_Mode == Mode.Ssl)
{
Logger?.Invoke("[WatsonTcpServer] Starting with SSL on " + _ListenerIp + ":" + _ListenerPort);
}
else
{
throw new ArgumentException("Unknown mode: " + _Mode.ToString());
}
return AcceptConnections();
}
/// <summary>
/// Send data to the specified client.
/// </summary>
/// <param name="ipPort">IP:port of the recipient client.</param>
/// <param name="data">String containing data.</param>
/// <returns>Boolean indicating if the message was sent successfully.</returns>
public bool Send(string ipPort, string data)
{
return Send(ipPort, null, data);
}
/// <summary>
/// Send data and metadata to the specified client.
/// </summary>
/// <param name="ipPort">IP:port of the recipient client.</param>
/// <param name="metadata">Dictionary containing metadata.</param>
/// <param name="data">String containing data.</param>
/// <returns>Boolean indicating if the message was sent successfully.</returns>
public bool Send(string ipPort, Dictionary<object, object> metadata, string data)
{
if (String.IsNullOrEmpty(ipPort)) throw new ArgumentNullException(nameof(ipPort));
byte[] bytes = new byte[0];
if (!String.IsNullOrEmpty(data)) bytes = Encoding.UTF8.GetBytes(data);
return Send(ipPort, metadata, bytes);
}
/// <summary>
/// Send data to the specified client.
/// </summary>
/// <param name="ipPort">IP:port of the recipient client.</param>
/// <param name="data">Byte array containing data.</param>
/// <returns>Boolean indicating if the message was sent successfully.</returns>
public bool Send(string ipPort, byte[] data)
{
return Send(ipPort, null, data);
}
/// <summary>
/// Send data and metadata to the specified client.
/// </summary>
/// <param name="ipPort">IP:port of the recipient client.</param>
/// <param name="metadata">Dictionary containing metadata.</param>
/// <param name="data">Byte array containing data.</param>
/// <returns>Boolean indicating if the message was sent successfully.</returns>
public bool Send(string ipPort, Dictionary<object, object> metadata, byte[] data)
{
if (String.IsNullOrEmpty(ipPort)) throw new ArgumentNullException(nameof(ipPort));
if (!_Clients.TryGetValue(ipPort, out ClientMetadata client))
{
Logger?.Invoke("[WatsonTcpServer] Unable to find client " + ipPort);
return false;
}
if (data == null) data = new byte[0];
WatsonCommon.BytesToStream(data, out long contentLength, out Stream stream);
return Send(ipPort, metadata, contentLength, stream);
}
/// <summary>
/// Send data to the specified client using a stream.
/// </summary>
/// <param name="ipPort">IP:port of the recipient client.</param>
/// <param name="contentLength">The number of bytes in the stream.</param>
/// <param name="stream">The stream containing the data.</param>
/// <returns>Boolean indicating if the message was sent successfully.</returns>
public bool Send(string ipPort, long contentLength, Stream stream)
{
return Send(ipPort, null, contentLength, stream);
}
/// <summary>
/// Send data and metadata to the specified client using a stream.
/// </summary>
/// <param name="ipPort">IP:port of the recipient client.</param>
/// <param name="metadata">Dictionary containing metadata.</param>
/// <param name="contentLength">The number of bytes in the stream.</param>
/// <param name="stream">The stream containing the data.</param>
/// <returns>Boolean indicating if the message was sent successfully.</returns>
public bool Send(string ipPort, Dictionary<object, object> metadata, long contentLength, Stream stream)
{
if (contentLength < 0) throw new ArgumentException("Content length must be zero or greater.");
if (String.IsNullOrEmpty(ipPort)) throw new ArgumentNullException(nameof(ipPort));
if (!_Clients.TryGetValue(ipPort, out ClientMetadata client))
{
Logger?.Invoke("[WatsonTcpServer] Unable to find client " + ipPort);
return false;
}
if (stream == null) stream = new MemoryStream(new byte[0]);
WatsonMessage msg = new WatsonMessage(metadata, contentLength, stream, false, false, null, null, Compression, (DebugMessages ? Logger : null));
return SendInternal(client, msg, contentLength, stream);
}
/// <summary>
/// Send metadata to the specified client with no data.
/// </summary>
/// <param name="ipPort">IP:port of the recipient client.</param>
/// <param name="metadata">Dictionary containing metadata.</param>
/// <returns>Boolean indicating if the message was sent successfully.</returns>
public bool Send(string ipPort, Dictionary<object, object> metadata)
{
if (String.IsNullOrEmpty(ipPort)) throw new ArgumentNullException(nameof(ipPort));
if (!_Clients.TryGetValue(ipPort, out ClientMetadata client))
{
Logger?.Invoke("[WatsonTcpServer] Unable to find client " + ipPort);
return false;
}
WatsonMessage msg = new WatsonMessage(metadata, 0, new MemoryStream(new byte[0]), false, false, null, null, Compression, (DebugMessages ? Logger : null));
return SendInternal(client, msg, 0, new MemoryStream(new byte[0]));
}
/// <summary>
/// Send data to the specified client, asynchronously.
/// </summary>
/// <param name="ipPort">IP:port of the recipient client.</param>
/// <param name="data">String containing data.</param>
/// <returns>Task with Boolean indicating if the message was sent successfully.</returns>
public async Task<bool> SendAsync(string ipPort, string data)
{
return await SendAsync(ipPort, null, data);
}
/// <summary>
/// Send data and metadata to the specified client, asynchronously.
/// </summary>
/// <param name="ipPort">IP:port of the recipient client.</param>
/// <param name="metadata">Dictionary containing metadata.</param>
/// <param name="data">String containing data.</param>
/// <returns>Task with Boolean indicating if the message was sent successfully.</returns>
public async Task<bool> SendAsync(string ipPort, Dictionary<object, object> metadata, string data)
{
if (String.IsNullOrEmpty(ipPort)) throw new ArgumentNullException(nameof(ipPort));
byte[] bytes = new byte[0];
if (!String.IsNullOrEmpty(data)) bytes = Encoding.UTF8.GetBytes(data);
return await SendAsync(ipPort, metadata, bytes);
}
/// <summary>
/// Send data to the specified client, asynchronously.
/// </summary>
/// <param name="ipPort">IP:port of the recipient client.</param>
/// <param name="data">Byte array containing data.</param>
/// <returns>Task with Boolean indicating if the message was sent successfully.</returns>
public async Task<bool> SendAsync(string ipPort, byte[] data)
{
return await SendAsync(ipPort, null, data);
}
/// <summary>
/// Send data and metadata to the specified client, asynchronously.
/// </summary>
/// <param name="ipPort">IP:port of the recipient client.</param>
/// <param name="metadata">Dictionary containing metadata.</param>
/// <param name="data">Byte array containing data.</param>
/// <returns>Task with Boolean indicating if the message was sent successfully.</returns>
public async Task<bool> SendAsync(string ipPort, Dictionary<object, object> metadata, byte[] data)
{
if (String.IsNullOrEmpty(ipPort)) throw new ArgumentNullException(nameof(ipPort));
if (!_Clients.TryGetValue(ipPort, out ClientMetadata client))
{
Logger?.Invoke("[WatsonTcpServer] Unable to find client " + ipPort);
return false;
}
if (data == null) data = new byte[0];
WatsonCommon.BytesToStream(data, out long contentLength, out Stream stream);
return await SendAsync(ipPort, metadata, contentLength, stream);
}
/// <summary>
/// Send data to the specified client using a stream, asynchronously.
/// </summary>
/// <param name="ipPort">IP:port of the recipient client.</param>
/// <param name="contentLength">The number of bytes in the stream.</param>
/// <param name="stream">The stream containing the data.</param>
/// <returns>Task with Boolean indicating if the message was sent successfully.</returns>
public async Task<bool> SendAsync(string ipPort, long contentLength, Stream stream)
{
return await SendAsync(ipPort, null, contentLength, stream);
}
/// <summary>
/// Send data and metadata to the specified client using a stream, asynchronously.
/// </summary>
/// <param name="ipPort">IP:port of the recipient client.</param>
/// <param name="metadata">Dictionary containing metadata.</param>
/// <param name="contentLength">The number of bytes in the stream.</param>
/// <param name="stream">The stream containing the data.</param>
/// <returns>Task with Boolean indicating if the message was sent successfully.</returns>
public async Task<bool> SendAsync(string ipPort, Dictionary<object, object> metadata, long contentLength, Stream stream)
{
if (contentLength < 0) throw new ArgumentException("Content length must be zero or greater.");
if (String.IsNullOrEmpty(ipPort)) throw new ArgumentNullException(nameof(ipPort));
if (!_Clients.TryGetValue(ipPort, out ClientMetadata client))
{
Logger?.Invoke("[WatsonTcpServer] Unable to find client " + ipPort);
return false;
}
if (stream == null) stream = new MemoryStream(new byte[0]);
WatsonMessage msg = new WatsonMessage(metadata, contentLength, stream, false, false, null, null, Compression, (DebugMessages ? Logger : null));
return await SendInternalAsync(client, msg, contentLength, stream);
}
/// <summary>
/// Send metadata to the specified client with no data, asynchronously.
/// </summary>
/// <param name="ipPort">IP:port of the recipient client.</param>
/// <param name="metadata">Dictionary containing metadata.</param>
/// <returns>Boolean indicating if the message was sent successfully.</returns>
public async Task<bool> SendAsync(string ipPort, Dictionary<object, object> metadata)
{
if (String.IsNullOrEmpty(ipPort)) throw new ArgumentNullException(nameof(ipPort));
if (!_Clients.TryGetValue(ipPort, out ClientMetadata client))
{
Logger?.Invoke("[WatsonTcpServer] Unable to find client " + ipPort);
return false;
}
WatsonMessage msg = new WatsonMessage(metadata, 0, new MemoryStream(new byte[0]), false, false, null, null, Compression, (DebugMessages ? Logger : null));
return await SendInternalAsync(client, msg, 0, new MemoryStream(new byte[0]));
}
/// <summary>
/// Send data and wait for a response for the specified number of milliseconds. A TimeoutException will be thrown if a response is not received.
/// </summary>
/// <param name="ipPort">The IP:port of the client.</param>
/// <param name="timeoutMs">Number of milliseconds to wait before considering a request to be expired.</param>
/// <param name="data">Data to send.</param>
/// <returns>SyncResponse.</returns>
public SyncResponse SendAndWait(string ipPort, int timeoutMs, string data)
{
return SendAndWait(ipPort, null, timeoutMs, data);
}
/// <summary>
/// Send data and wait for a response for the specified number of milliseconds. A TimeoutException will be thrown if a response is not received.
/// </summary>
/// <param name="ipPort">The IP:port of the client.</param>
/// <param name="timeoutMs">Number of milliseconds to wait before considering a request to be expired.</param>
/// <param name="data">Data to send.</param>
/// <returns>SyncResponse.</returns>
public SyncResponse SendAndWait(string ipPort, int timeoutMs, byte[] data)
{
return SendAndWait(ipPort, null, timeoutMs, data);
}
/// <summary>
/// Send data and wait for a response for the specified number of milliseconds. A TimeoutException will be thrown if a response is not received.
/// </summary>
/// <param name="ipPort">The IP:port of the client.</param>
/// <param name="timeoutMs">Number of milliseconds to wait before considering a request to be expired.</param>
/// <param name="contentLength">The number of bytes to send from the supplied stream.</param>
/// <param name="stream">Stream containing data.</param>
/// <returns>SyncResponse.</returns>
public SyncResponse SendAndWait(string ipPort, int timeoutMs, long contentLength, Stream stream)
{
return SendAndWait(ipPort, null, timeoutMs, contentLength, stream);
}
/// <summary>
/// Send data and wait for a response for the specified number of milliseconds. A TimeoutException will be thrown if a response is not received.
/// </summary>
/// <param name="ipPort">The IP:port of the client.</param>
/// <param name="metadata">Metadata dictionary to attach to the message.</param>
/// <param name="timeoutMs">Number of milliseconds to wait before considering a request to be expired.</param>
/// <param name="data">Data to send.</param>
/// <returns>SyncResponse.</returns>
public SyncResponse SendAndWait(string ipPort, Dictionary<object, object> metadata, int timeoutMs, string data)
{
byte[] bytes = new byte[0];
if (!String.IsNullOrEmpty(data)) bytes = Encoding.UTF8.GetBytes(data);
return SendAndWait(ipPort, metadata, timeoutMs, bytes);
}
/// <summary>
/// Send data and wait for a response for the specified number of milliseconds.
/// </summary>
/// <param name="ipPort">The IP:port of the client.</param>
/// <param name="metadata">Metadata dictionary to attach to the message.</param>
/// <param name="timeoutMs">Number of milliseconds to wait before considering a request to be expired.</param>
/// <param name="data">Data to send.</param>
/// <returns>SyncResponse.</returns>
public SyncResponse SendAndWait(string ipPort, Dictionary<object, object> metadata, int timeoutMs, byte[] data)
{
if (String.IsNullOrEmpty(ipPort)) throw new ArgumentNullException(nameof(ipPort));
if (timeoutMs < 1000) throw new ArgumentException("Timeout milliseconds must be 1000 or greater.");
if (!_Clients.TryGetValue(ipPort, out ClientMetadata client))
{
Logger?.Invoke("[WatsonTcpServer] Unable to find client " + ipPort);
throw new KeyNotFoundException("Unable to find client " + ipPort + ".");
}
if (data == null) data = new byte[0];
WatsonCommon.BytesToStream(data, out long contentLength, out Stream stream);
return SendAndWait(ipPort, metadata, timeoutMs, contentLength, stream);
}
/// <summary>
/// Send data and wait for a response for the specified number of milliseconds. A TimeoutException will be thrown if a response is not received.
/// </summary>
/// <param name="ipPort">The IP:port of the client.</param>
/// <param name="metadata">Metadata dictionary to attach to the message.</param>
/// <param name="timeoutMs">Number of milliseconds to wait before considering a request to be expired.</param>
/// <param name="contentLength">The number of bytes to send from the supplied stream.</param>
/// <param name="stream">Stream containing data.</param>
/// <returns>SyncResponse.</returns>
public SyncResponse SendAndWait(string ipPort, Dictionary<object, object> metadata, int timeoutMs, long contentLength, Stream stream)
{
if (contentLength < 0) throw new ArgumentException("Content length must be zero or greater.");
if (String.IsNullOrEmpty(ipPort)) throw new ArgumentNullException(nameof(ipPort));
if (timeoutMs < 1000) throw new ArgumentException("Timeout milliseconds must be 1000 or greater.");
if (!_Clients.TryGetValue(ipPort, out ClientMetadata client))
{
Logger?.Invoke("[WatsonTcpServer] Unable to find client " + ipPort);
throw new KeyNotFoundException("Unable to find client " + ipPort + ".");
}
if (stream == null) stream = new MemoryStream(new byte[0]);
DateTime expiration = DateTime.Now.AddMilliseconds(timeoutMs);
WatsonMessage msg = new WatsonMessage(metadata, contentLength, stream, true, false, expiration, Guid.NewGuid().ToString(), Compression, (DebugMessages ? Logger : null));
return SendAndWaitInternal(client, msg, timeoutMs, contentLength, stream);
}
/// <summary>
/// Send metadata and wait for a response for the specified number of milliseconds. A TimeoutException will be thrown if a response is not received.
/// </summary>
/// <param name="ipPort">The IP:port of the client.</param>
/// <param name="metadata">Metadata dictionary to attach to the message.</param>
/// <param name="timeoutMs">Number of milliseconds to wait before considering a request to be expired.</param>
/// <returns>SyncResponse.</returns>
public SyncResponse SendAndWait(string ipPort, Dictionary<object, object> metadata, int timeoutMs)
{
if (String.IsNullOrEmpty(ipPort)) throw new ArgumentNullException(nameof(ipPort));
if (timeoutMs < 1000) throw new ArgumentException("Timeout milliseconds must be 1000 or greater.");
if (!_Clients.TryGetValue(ipPort, out ClientMetadata client))
{
Logger?.Invoke("[WatsonTcpServer] Unable to find client " + ipPort);
throw new KeyNotFoundException("Unable to find client " + ipPort + ".");
}
DateTime expiration = DateTime.Now.AddMilliseconds(timeoutMs);
WatsonMessage msg = new WatsonMessage(metadata, 0, new MemoryStream(new byte[0]), true, false, expiration, Guid.NewGuid().ToString(), Compression, (DebugMessages ? Logger : null));
return SendAndWaitInternal(client, msg, timeoutMs, 0, new MemoryStream(new byte[0]));
}
/// <summary>
/// Determine whether or not the specified client is connected to the server.
/// </summary>
/// <returns>Boolean indicating if the client is connected to the server.</returns>
public bool IsClientConnected(string ipPort)
{
if (String.IsNullOrEmpty(ipPort)) throw new ArgumentNullException(nameof(ipPort));
return (_Clients.TryGetValue(ipPort, out ClientMetadata client));
}
/// <summary>
/// List the IP:port of each connected client.
/// </summary>
/// <returns>An enumerable string list containing each client IP:port.</returns>
public IEnumerable<string> ListClients()
{
return _Clients.Keys.ToList();
}
/// <summary>
/// Disconnects the specified client.
/// </summary>
/// <param name="ipPort">IP:port of the client.</param>
public void DisconnectClient(string ipPort)
{
if (String.IsNullOrEmpty(ipPort)) throw new ArgumentNullException(nameof(ipPort));
if (!_Clients.TryGetValue(ipPort, out ClientMetadata client))
{
Logger?.Invoke("[WatsonTcpServer] Unable to find client " + ipPort);
}
else
{
byte[] data = null;
if (_ClientsTimedout.ContainsKey(ipPort))
{
data = Encoding.UTF8.GetBytes("Removed from server due to timeout.");
}
else
{
data = Encoding.UTF8.GetBytes("Removed from server.");
_ClientsKicked.TryAdd(ipPort, DateTime.Now);
}
WatsonMessage removeMsg = new WatsonMessage();
removeMsg.Status = MessageStatus.Removed;
SendInternal(client, removeMsg, 0, null);
client.Dispose();
_Clients.TryRemove(ipPort, out ClientMetadata removed);
}
}
#endregion
#region Private-Methods
/// <summary>
/// Tear down the server and dispose of background workers.
/// </summary>
/// <param name="disposing">Indicate if resources should be disposed.</param>
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
Logger?.Invoke("[WatsonTcpClient] Disposing");
if (_TokenSource != null)
{
if (!_TokenSource.IsCancellationRequested) _TokenSource.Cancel();
_TokenSource.Dispose();
_TokenSource = null;
}
if (_Listener != null && _Listener.Server != null)
{
_Listener.Server.Close();
_Listener.Server.Dispose();
_Listener = null;
}
if (_Clients != null && _Clients.Count > 0)
{
WatsonMessage discMsg = new WatsonMessage();
discMsg.Status = MessageStatus.Disconnecting;
foreach (KeyValuePair<string, ClientMetadata> currMetadata in _Clients)
{
SendInternal(currMetadata.Value, discMsg, 0, null);
currMetadata.Value.Dispose();
}
_Clients = null;
_UnauthenticatedClients = null;
}
Logger?.Invoke("[WatsonTcpServer] Dispose complete");
}
}
private bool AcceptCertificate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
{
// return true; // Allow untrusted certificates.
return AcceptInvalidCertificates;
}
private async Task AcceptConnections()
{
_IsListening = true;
_Listener.Start();
while (!_Token.IsCancellationRequested)
{
string ipPort = String.Empty;
try
{
#region Check-for-Maximum-Connections
if (!_IsListening && (_Connections >= _MaxConnections))
{
continue;
}
else if (!_IsListening)
{
_Listener.Start();
_IsListening = true;
}
#endregion
#region Accept-Connection-and-Validate-IP
TcpClient tcp = await _Listener.AcceptTcpClientAsync();
tcp.LingerState.Enabled = false;
string clientIp = ((IPEndPoint)tcp.Client.RemoteEndPoint).Address.ToString();
if (PermittedIPs.Count > 0 && !PermittedIPs.Contains(clientIp))
{
Logger?.Invoke("[WatsonTcpServer] Rejecting connection from " + clientIp + " (not permitted)");
tcp.Close();
continue;
}
ClientMetadata client = new ClientMetadata(tcp);
ipPort = client.IpPort;
#endregion Accept-Connection-and-Validate-IP
#region Check-for-Maximum-Connections
_Connections++;
if (_Connections >= _MaxConnections)
{
Logger?.Invoke("[WatsonTcpServer] Maximum connections " + _MaxConnections + " met or exceeded (currently " + _Connections + " connections), pausing listener");
_IsListening = false;
_Listener.Stop();
}
#endregion
#region Initialize-Client-and-Finalize-Connection
if (_Mode == Mode.Tcp)
{
Task unawaited = Task.Run(() => FinalizeConnection(client), _Token);
}
else if (_Mode == Mode.Ssl)
{
if (AcceptInvalidCertificates)
{
client.SslStream = new SslStream(client.NetworkStream, false, new RemoteCertificateValidationCallback(AcceptCertificate));
}
else
{
client.SslStream = new SslStream(client.NetworkStream, false);
}
Task unawaited = Task.Run(() =>
{
Task<bool> success = StartTls(client);
if (success.Result)
{
FinalizeConnection(client);
}
}, _Token);
}
else
{
throw new ArgumentException("Unknown mode: " + _Mode.ToString());
}
Logger?.Invoke("[WatsonTcpServer] Accepted connection from " + client.IpPort);
#endregion
}
catch (ObjectDisposedException)
{
Logger?.Invoke("[WatsonTcpServer] Disposal detected, closing connection listener");
}
catch (Exception e)
{
Logger?.Invoke("[WatsonTcpServer] Exception for " + ipPort + ": " + e.Message);
}
}
}
private async Task<bool> StartTls(ClientMetadata client)
{
try
{
await client.SslStream.AuthenticateAsServerAsync(_SslCertificate, true, SslProtocols.Tls12, !AcceptInvalidCertificates);
if (!client.SslStream.IsEncrypted)
{
Logger?.Invoke("[WatsonTcpServer] Stream from " + client.IpPort + " not encrypted");
client.Dispose();
_Connections--;
return false;
}
if (!client.SslStream.IsAuthenticated)
{
Logger?.Invoke("[WatsonTcpServer] Stream from " + client.IpPort + " not authenticated");
client.Dispose();
_Connections--;
return false;
}
if (MutuallyAuthenticate && !client.SslStream.IsMutuallyAuthenticated)
{
Logger?.Invoke("[WatsonTcpServer] Stream from " + client.IpPort + " failed mutual authentication");
client.Dispose();
_Connections--;
return false;
}
}
catch (IOException ex)
{
// Some type of problem initiating the SSL connection
switch (ex.Message)
{
case "Authentication failed because the remote party has closed the transport stream.":
case "Unable to read data from the transport connection: An existing connection was forcibly closed by the remote host.":
Logger?.Invoke("[WatsonTcpServer] Connection closed by " + client.IpPort + " during SSL negotiation");
break;
case "The handshake failed due to an unexpected packet format.":
Logger?.Invoke("[WatsonTcpServer] Disconnected " + client.IpPort + " due to invalid handshake");
break;
default:
Logger?.Invoke(
"[WatsonTcpServer] Disconnected " + client.IpPort + " due to TLS exception: " +
Environment.NewLine +
SerializationHelper.SerializeJson(ex, true));
break;
}
client.Dispose();
_Connections--;
return false;
}
catch (Exception ex)
{
Logger?.Invoke("[WatsonTcpServer] Exception on " + client.IpPort + " during TLS negotiation: " + Environment.NewLine + ex.ToString());
client.Dispose();
_Connections--;
return false;
}
return true;
}
private void FinalizeConnection(ClientMetadata client)
{
#region Add-to-Client-List
_Clients.TryAdd(client.IpPort, client);
_ClientsLastSeen.TryAdd(client.IpPort, DateTime.Now);
#endregion
#region Request-Authentication
if (!String.IsNullOrEmpty(PresharedKey))
{
Logger?.Invoke("[WatsonTcpServer] Soliciting authentication material from " + client.IpPort);
_UnauthenticatedClients.TryAdd(client.IpPort, DateTime.Now);
byte[] data = Encoding.UTF8.GetBytes("Authentication required");
WatsonMessage authMsg = new WatsonMessage();
authMsg.Status = MessageStatus.AuthRequired;
SendInternal(client, authMsg, 0, null);
}
#endregion
#region Start-Data-Receiver
Logger?.Invoke("[WatsonTcpServer] Starting data receiver for " + client.IpPort);
if (ClientConnected != null)
{
ClientConnected?.Invoke(this, new ClientConnectedEventArgs(client.IpPort));
}
Task.Run(async () => await DataReceiver(client, client.Token));
#endregion
}
private bool IsConnected(ClientMetadata client)
{
if (client.TcpClient.Connected)
{
byte[] tmp = new byte[1];
bool success = false;
try
{
AcquireSendLock(client.IpPort);
client.TcpClient.Client.Send(tmp, 0, 0);
success = true;
}
catch (ObjectDisposedException)
{
}
catch (IOException)
{
}
catch (SocketException se)
{
if (se.NativeErrorCode.Equals(10035)) success = true;
}
catch (Exception e)
{
Logger?.Invoke("[WatsonTcpServer] Exception while testing connection to " + client.IpPort + " using send: " + e.Message);
success = false;
}
finally
{
if (client != null) ReleaseSendLock(client.IpPort);
}
if (success) return true;
try
{
AcquireReceiveLock(client.IpPort);
if ((client.TcpClient.Client.Poll(0, SelectMode.SelectWrite))
&& (!client.TcpClient.Client.Poll(0, SelectMode.SelectError)))
{
byte[] buffer = new byte[1];
if (client.TcpClient.Client.Receive(buffer, SocketFlags.Peek) == 0)
{
return false;
}
else
{
return true;
}
}
else
{
return false;
}
}
catch (Exception e)
{
Logger?.Invoke("[WatsonTcpServer] Exception while testing connection to " + client.IpPort + " using poll/peek: " + e.Message);;
return false;
}
finally
{
if (client != null) ReleaseReceiveLock(client.IpPort);
}
}
else
{
return false;
}
}
private async Task DataReceiver(ClientMetadata client, CancellationToken token)
{
while (true)
{
try
{
token.ThrowIfCancellationRequested();
if (!IsConnected(client)) break;
WatsonMessage msg = new WatsonMessage(client.DataStream, (DebugMessages ? Logger : null));
bool buildSuccess = await msg.BuildFromStream();
if (!buildSuccess)
{
Logger?.Invoke("[WatsonTcpServer] Message build failed due to disconnect for client " + client.IpPort);
break;
}
if (msg == null)
{
await Task.Delay(30);
continue;
}
if (!String.IsNullOrEmpty(PresharedKey))
{
if (_UnauthenticatedClients.ContainsKey(client.IpPort))
{
Logger?.Invoke("[WatsonTcpServer] Message received from unauthenticated endpoint " + client.IpPort);
if (msg.Status == MessageStatus.AuthRequested)
{
// check preshared key
if (msg.PresharedKey != null && msg.PresharedKey.Length > 0)
{
string clientPsk = Encoding.UTF8.GetString(msg.PresharedKey).Trim();
if (PresharedKey.Trim().Equals(clientPsk))
{
Logger?.Invoke("[WatsonTcpServer] Accepted authentication for " + client.IpPort);
_UnauthenticatedClients.TryRemove(client.IpPort, out DateTime dt);
AuthenticationSucceeded?.Invoke(this, new AuthenticationSucceededEventArgs(client.IpPort));
byte[] data = Encoding.UTF8.GetBytes("Authentication successful");
WatsonCommon.BytesToStream(data, out long contentLength, out Stream stream);
WatsonMessage authMsg = new WatsonMessage(null, contentLength, stream, false, false, null, null, CompressionType.None, (DebugMessages ? Logger : null));
authMsg.Status = MessageStatus.AuthSuccess;
SendInternal(client, authMsg, 0, null);
continue;
}
else
{
Logger?.Invoke("[WatsonTcpServer] Declined authentication for " + client.IpPort);
byte[] data = Encoding.UTF8.GetBytes("Authentication declined");
AuthenticationFailed?.Invoke(this, new AuthenticationFailedEventArgs(client.IpPort));
WatsonCommon.BytesToStream(data, out long contentLength, out Stream stream);
WatsonMessage authMsg = new WatsonMessage(null, contentLength, stream, false, false, null, null, CompressionType.None, (DebugMessages ? Logger : null));
authMsg.Status = MessageStatus.AuthFailure;
SendInternal(client, authMsg, 0, null);
continue;
}
}
else
{
Logger?.Invoke("[WatsonTcpServer] No authentication material for " + client.IpPort);
byte[] data = Encoding.UTF8.GetBytes("No authentication material");
AuthenticationFailed?.Invoke(this, new AuthenticationFailedEventArgs(client.IpPort));
WatsonCommon.BytesToStream(data, out long contentLength, out Stream stream);
WatsonMessage authMsg = new WatsonMessage(null, contentLength, stream, false, false, null, null, CompressionType.None, (DebugMessages ? Logger : null));
authMsg.Status = MessageStatus.AuthFailure;
SendInternal(client, authMsg, 0, null);
continue;
}
}
else
{
// decline the message
Logger?.Invoke("[WatsonTcpServer] No authentication material for " + client.IpPort);
byte[] data = Encoding.UTF8.GetBytes("Authentication required");
AuthenticationRequested?.Invoke(this, new AuthenticationRequestedEventArgs(client.IpPort));
WatsonCommon.BytesToStream(data, out long contentLength, out Stream stream);
WatsonMessage authMsg = new WatsonMessage(null, contentLength, stream, false, false, null, null, CompressionType.None, (DebugMessages ? Logger : null));
authMsg.Status = MessageStatus.AuthRequired;
SendInternal(client, authMsg, 0, null);
continue;
}
}
}
if (msg.Status == MessageStatus.Disconnecting)
{
Logger?.Invoke("[WatsonTcpServer] Received notification of disconnection from " + client.IpPort);
break;
}
else if (msg.Status == MessageStatus.Removed)
{
Logger?.Invoke("[WatsonTcpServer] Sent notification of removal to " + client.IpPort);
break;
}
if (msg.SyncRequest)
{
DateTime expiration = WatsonCommon.GetExpirationTimestamp(msg);
byte[] msgData = await WatsonCommon.ReadMessageDataAsync(msg, _StreamBufferSize);
if (SyncRequestReceived != null)
{
if (DateTime.Now < expiration)
{
SyncRequest syncReq = new SyncRequest(
client.IpPort,
msg.ConversationGuid,
msg.Expiration.Value,
msg.Metadata,
msgData);
SyncResponse syncResp = SyncRequestReceived(syncReq);
if (syncResp != null)
{
WatsonCommon.BytesToStream(syncResp.Data, out long contentLength, out Stream stream);
WatsonMessage respMsg = new WatsonMessage(
syncResp.Metadata,
contentLength,
stream,
false,
true,
msg.Expiration.Value,
msg.ConversationGuid,
Compression,
(DebugMessages ? Logger : null));
SendInternal(client, respMsg, contentLength, stream);
}
}
else
{
Logger?.Invoke("[WatsonTcpServer] Expired synchronous request received and discarded from " + client.IpPort);
}
}
}
else if (msg.SyncResponse)
{
// No need to amend message expiration; it is copied from the request, which was set by this node
// DateTime expiration = WatsonCommon.GetExpirationTimestamp(msg);
byte[] msgData = await WatsonCommon.ReadMessageDataAsync(msg, _StreamBufferSize);
if (DateTime.Now < msg.Expiration.Value)
{
lock (_SyncResponseLock)
{
_SyncResponses.Add(msg.ConversationGuid, new SyncResponse(msg.Expiration.Value, msg.Metadata, msgData));
}
}
else
{
Logger?.Invoke("[WatsonTcpServer] Expired synchronous response received and discarded from " + client.IpPort);
}
}
else
{
byte[] msgData = null;
MemoryStream ms = new MemoryStream();
if (_MessageReceived != null && _MessageReceived.GetInvocationList().Length > 0)
{
msgData = await WatsonCommon.ReadMessageDataAsync(msg, _StreamBufferSize);
MessageReceivedFromClientEventArgs mr = new MessageReceivedFromClientEventArgs(client.IpPort, msg.Metadata, msgData);
_MessageReceived?.Invoke(this, mr);
}
else if (_StreamReceived != null && _StreamReceived.GetInvocationList().Length > 0)
{
StreamReceivedFromClientEventArgs sr = null;
if (msg.Compression == CompressionType.None)
{
sr = new StreamReceivedFromClientEventArgs(client.IpPort, msg.Metadata, msg.ContentLength, msg.DataStream);
_StreamReceived?.Invoke(this, sr);
}
else if (msg.Compression == CompressionType.Deflate)
{
using (DeflateStream ds = new DeflateStream(msg.DataStream, CompressionMode.Decompress, true))
{
msgData = WatsonCommon.ReadStreamFully(ds);
ms = new MemoryStream(msgData);
ms.Seek(0, SeekOrigin.Begin);
sr = new StreamReceivedFromClientEventArgs(client.IpPort, msg.Metadata, msg.ContentLength, ms);
_StreamReceived?.Invoke(this, sr);
}
}
else if (msg.Compression == CompressionType.Gzip)
{
using (GZipStream gs = new GZipStream(msg.DataStream, CompressionMode.Decompress, true))
{
msgData = WatsonCommon.ReadStreamFully(gs);
ms = new MemoryStream(msgData);
ms.Seek(0, SeekOrigin.Begin);
sr = new StreamReceivedFromClientEventArgs(client.IpPort, msg.Metadata, msg.ContentLength, ms);
_StreamReceived?.Invoke(this, sr);
}
}
else
{
throw new InvalidOperationException("Unknown compression type: " + msg.Compression.ToString());
}
}
else
{
Logger?.Invoke("[WatsonTcpServer] Event handler not set for either MessageReceived or StreamReceived");
break;
}
}
_Stats.ReceivedMessages = _Stats.ReceivedMessages + 1;
_Stats.ReceivedBytes += msg.ContentLength;
UpdateClientLastSeen(client.IpPort);
}
catch (ObjectDisposedException)
{
Logger?.Invoke("[WatsonTcpServer] Client " + client.IpPort + " disconnected due to disposal");
break;
}
catch (OperationCanceledException)
{
Logger?.Invoke("[WatsonTcpServer] Cancellation requested");
break;
}
catch (Exception e)
{
Logger?.Invoke(
"[WatsonTcpServer] Data receiver exception for " + client.IpPort + ":" +
Environment.NewLine +
SerializationHelper.SerializeJson(e, true) +
Environment.NewLine);
break;
}
}
Logger?.Invoke("[WatsonTcpServer] Data receiver terminated for " + client.IpPort);
ClientDisconnectedEventArgs cd = null;
if (ClientDisconnected != null)
{
if (_ClientsKicked.ContainsKey(client.IpPort))
{
cd = new ClientDisconnectedEventArgs(client.IpPort, DisconnectReason.Kicked);
}
else if (_ClientsTimedout.ContainsKey(client.IpPort))
{
cd = new ClientDisconnectedEventArgs(client.IpPort, DisconnectReason.Timeout);
}
else
{
cd = new ClientDisconnectedEventArgs(client.IpPort, DisconnectReason.Normal);
}
}
DateTime removedTs;
_Clients.TryRemove(client.IpPort, out ClientMetadata removedClient);
_ClientsLastSeen.TryRemove(client.IpPort, out removedTs);
_ClientsKicked.TryRemove(client.IpPort, out removedTs);
_ClientsTimedout.TryRemove(client.IpPort, out removedTs);
_UnauthenticatedClients.TryRemove(client.IpPort, out removedTs);
_Connections--;
if (cd != null)
{
ClientDisconnected?.Invoke(this, cd);
}
Logger?.Invoke("[WatsonTcpServer] Disposing data receiver for " + client.IpPort);
client.Dispose();
}
private bool SendInternal(ClientMetadata client, WatsonMessage msg, long contentLength, Stream stream)
{
if (client == null) throw new ArgumentNullException(nameof(client));
if (msg == null) throw new ArgumentNullException(nameof(msg));
if (contentLength > 0)
{
if (stream == null || !stream.CanRead)
{
throw new ArgumentException("Cannot read from supplied stream.");
}
}
AcquireSendLock(client.IpPort);
try
{
SendHeaders(client, msg);
SendDataStream(client, contentLength, stream);
_Stats.SentMessages += 1;
_Stats.SentBytes += contentLength;
return true;
}
catch (Exception e)
{
Logger?.Invoke("[WatsonTcpServer] Failed to write message to " + client.IpPort + " due to exception: " + e.Message);
return false;
}
finally
{
if (client != null)
{
ReleaseSendLock(client.IpPort);
}
}
}
private async Task<bool> SendInternalAsync(ClientMetadata client, WatsonMessage msg, long contentLength, Stream stream)
{
if (client == null) throw new ArgumentNullException(nameof(client));
if (msg == null) throw new ArgumentNullException(nameof(msg));
if (contentLength > 0)
{
if (stream == null || !stream.CanRead)
{
throw new ArgumentException("Cannot read from supplied stream.");
}
}
AcquireSendLock(client.IpPort);
try
{
await SendHeadersAsync(client, msg);
await SendDataStreamAsync(client, contentLength, stream);
_Stats.SentMessages += 1;
_Stats.SentBytes += contentLength;
return true;
}
catch (Exception e)
{
Logger?.Invoke("[WatsonTcpServer] Failed to write message to " + client.IpPort + " due to exception: " + e.Message);
return false;
}
finally
{
ReleaseSendLock(client.IpPort);
}
}
private SyncResponse SendAndWaitInternal(ClientMetadata client, WatsonMessage msg, int timeoutMs, long contentLength, Stream stream)
{
if (client == null) throw new ArgumentNullException(nameof(client));
if (msg == null) throw new ArgumentNullException(nameof(msg));
if (contentLength > 0)
{
if (stream == null || !stream.CanRead)
{
throw new ArgumentException("Cannot read from supplied stream.");
}
}
AcquireSendLock(client.IpPort);
try
{
SendHeaders(client, msg);
SendDataStream(client, contentLength, stream);
_Stats.SentMessages += 1;
_Stats.SentBytes += contentLength;
}
catch (Exception e)
{
Logger?.Invoke("[WatsonTcpServer] Failed to write message to " + client.IpPort + " due to exception: " + e.Message);
throw;
}
finally
{
ReleaseSendLock(client.IpPort);
}
SyncResponse ret = GetSyncResponse(msg.ConversationGuid, msg.Expiration.Value);
return ret;
}
private void SendHeaders(ClientMetadata client, WatsonMessage msg)
{
byte[] headerBytes = msg.HeaderBytes;
client.DataStream.Write(headerBytes, 0, headerBytes.Length);
client.DataStream.Flush();
}
private async Task SendHeadersAsync(ClientMetadata client, WatsonMessage msg)
{
byte[] headerBytes = msg.HeaderBytes;
await client.DataStream.WriteAsync(headerBytes, 0, headerBytes.Length);
await client.DataStream.FlushAsync();
}
private void SendDataStream(ClientMetadata client, long contentLength, Stream stream)
{
if (contentLength <= 0) return;
long bytesRemaining = contentLength;
int bytesRead = 0;
byte[] buffer = new byte[_StreamBufferSize];
if (Compression == CompressionType.None)
{
while (bytesRemaining > 0)
{
bytesRead = stream.Read(buffer, 0, buffer.Length);
if (bytesRead > 0)
{
client.DataStream.Write(buffer, 0, bytesRead);
bytesRemaining -= bytesRead;
}
}
}
else if (Compression == CompressionType.Gzip)
{
using (GZipStream gs = new GZipStream(client.DataStream, CompressionMode.Compress, true))
{
while (bytesRemaining > 0)
{
bytesRead = stream.Read(buffer, 0, buffer.Length);
if (bytesRead > 0)
{
gs.Write(buffer, 0, bytesRead);
bytesRemaining -= bytesRead;
}
}
gs.Flush();
gs.Close();
}
}
else if (Compression == CompressionType.Deflate)
{
using (DeflateStream ds = new DeflateStream(client.DataStream, CompressionMode.Compress, true))
{
while (bytesRemaining > 0)
{
bytesRead = stream.Read(buffer, 0, buffer.Length);
if (bytesRead > 0)
{
ds.Write(buffer, 0, bytesRead);
bytesRemaining -= bytesRead;
}
}
ds.Flush();
ds.Close();
}
}
else
{
throw new InvalidOperationException("Unknown compression type: " + Compression.ToString());
}
client.DataStream.Flush();
}
private async Task SendDataStreamAsync(ClientMetadata client, long contentLength, Stream stream)
{
if (contentLength <= 0) return;
long bytesRemaining = contentLength;
int bytesRead = 0;
byte[] buffer = new byte[_StreamBufferSize];
if (Compression == CompressionType.None)
{
while (bytesRemaining > 0)
{
bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length);
if (bytesRead > 0)
{
await client.DataStream.WriteAsync(buffer, 0, bytesRead);
bytesRemaining -= bytesRead;
}
}
}
else if (Compression == CompressionType.Gzip)
{
using (GZipStream gs = new GZipStream(client.DataStream, CompressionMode.Compress, true))
{
while (bytesRemaining > 0)
{
bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length);
if (bytesRead > 0)
{
await gs.WriteAsync(buffer, 0, bytesRead);
bytesRemaining -= bytesRead;
}
}
await gs.FlushAsync();
gs.Close();
}
}
else if (Compression == CompressionType.Deflate)
{
using (DeflateStream ds = new DeflateStream(client.DataStream, CompressionMode.Compress, true))
{
while (bytesRemaining > 0)
{
bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length);
if (bytesRead > 0)
{
await ds.WriteAsync(buffer, 0, bytesRead);
bytesRemaining -= bytesRead;
}
}
await ds.FlushAsync();
ds.Close();
}
}
else
{
throw new InvalidOperationException("Unknown compression type: " + Compression.ToString());
}
await client.DataStream.FlushAsync();
}
private async Task MonitorForIdleClients()
{
while (!_Token.IsCancellationRequested)
{
if (_IdleClientTimeoutSeconds > 0 && _ClientsLastSeen.Count > 0)
{
MonitorForIdleClientsTask();
}
await Task.Delay(5000, _Token);
}
}
private void MonitorForIdleClientsTask()
{
DateTime idleTimestamp = DateTime.Now.AddSeconds(-1 * _IdleClientTimeoutSeconds);
foreach (KeyValuePair<string, DateTime> curr in _ClientsLastSeen)
{
if (curr.Value < idleTimestamp)
{
_ClientsTimedout.TryAdd(curr.Key, DateTime.Now);
Logger?.Invoke("[WatsonTcpServer] Disconnecting client " + curr.Key + " due to idle timeout");
DisconnectClient(curr.Key);
}
}
}
private void UpdateClientLastSeen(string ipPort)
{
if (_ClientsLastSeen.ContainsKey(ipPort))
{
DateTime ts;
_ClientsLastSeen.TryRemove(ipPort, out ts);
}
_ClientsLastSeen.TryAdd(ipPort, DateTime.Now);
}
private async Task MonitorForExpiredSyncResponses()
{
while (_TokenSource != null && !_TokenSource.IsCancellationRequested)
{
if (_Token.IsCancellationRequested) break;
await Task.Delay(1000);
lock (_SyncResponseLock)
{
if (_SyncResponses.Any(s =>
s.Value.ExpirationUtc < DateTime.Now
))
{
Dictionary<string, SyncResponse> expired = _SyncResponses.Where(s =>
s.Value.ExpirationUtc < DateTime.Now
).ToDictionary(dict => dict.Key, dict => dict.Value);
foreach (KeyValuePair<string, SyncResponse> curr in expired)
{
Logger?.Invoke("[WatsonTcpServer] MonitorForExpiredSyncResponses expiring response " + curr.Key.ToString());
_SyncResponses.Remove(curr.Key);
}
}
}
}
}
private SyncResponse GetSyncResponse(string guid, DateTime expirationUtc)
{
SyncResponse ret = null;
while (true)
{
lock (_SyncResponseLock)
{
if (_SyncResponses.ContainsKey(guid))
{
ret = _SyncResponses[guid];
_SyncResponses.Remove(guid);
break;
}
}
if (DateTime.Now >= expirationUtc) break;
Task.Delay(50).Wait();
}
if (ret != null) return ret;
else throw new TimeoutException("A response to a synchronous request was not received within the timeout window.");
}
private void AcquireSendLock(string ipPort)
{
bool added = false;
while (!added)
{
lock (_ClientsSendLock)
{
if (_ClientsSending.Contains(ipPort))
{
Task.Delay(25).Wait();
}
else
{
_ClientsSending.Add(ipPort);
added = true;
}
}
}
}
private void ReleaseSendLock(string ipPort)
{
lock (_ClientsSendLock)
{
if (_ClientsSending.Contains(ipPort))
{
_ClientsSending.Remove(ipPort);
}
}
}
private void AcquireReceiveLock(string ipPort)
{
bool added = false;
while (!added)
{
lock (_ClientsReceiveLock)
{
if (_ClientsReceiving.Contains(ipPort))
{
Task.Delay(25).Wait();
}
else
{
_ClientsReceiving.Add(ipPort);
added = true;
}
}
}
}
private void ReleaseReceiveLock(string ipPort)
{
lock (_ClientsReceiveLock)
{
if (_ClientsReceiving.Contains(ipPort))
{
_ClientsReceiving.Remove(ipPort);
}
}
}
#endregion
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C#
1
https://gitee.com/ntripshare/ntrip-share-base.git
git@gitee.com:ntripshare/ntrip-share-base.git
ntripshare
ntrip-share-base
NtripShareBase
master

搜索帮助