DelphiGrpc is a Delphi implementation of the gRPC protocol (http://grpc.io).
This protocol is very efficient and fast (realtime), because it is based on:
Another important benefit of gRPC is that it uses a readable contract format (https://grpc.io/docs/guides/concepts.html#service-definition).
The following features are implemented:
See Generator branch for .proto importer and generator
The following libraries are used:
Although it has been used in an important research project, it is not production ready yet. A demo is available to show it works with the gRPC endpoint of Google Speech. Another demo shows it works with the official Golang implementation of gRPC too. See Examples for an explanation of the Delphi client and server code.
Unfortunately, we can only use the ngHttp2.dll on Windows (and ngHttp2.so on Linux) but not on any mobile device (Android or iOS). There are some gRPC to REST proxies but none of them were suitable for our needs. Because gRPC uses http/2 frames, there isn't that big difference with WebSocket messages if you think about it! So our WebSocket wrapper puts every gRPC frame in a WebSocket message (using protobuffers of course), et voila!
message WSRequest {
int64 id = 1; //unique request id
string path = 2; //gRPC path, e.g. "/routeguide.RouteGuide/RouteChat"
bool close = 3; //is request/stream closed?
int64 size = 4; //grpc size
bytes grpc = 5; //gRPC frame
}
message WSResponse {
int64 id = 1; //request id
bool close = 2; //has client closed the request/stream?
int64 size = 3; //grpc size
bytes grpc = 4; //gRPC frame
}
The WebSocket server keeps track of the request id's and when a response is received with the same id, the normal gRPC handling processes the response data for the original request (streaming or callback).
Some examples to show how to use it.
Next is an example of a service definition with one "Sleep" call:
syntax = "proto3";
package testservice;
service TestService {
rpc Sleep(Time) returns (Time) {}
}
message Time {
int32 sec = 1;
}
The following Delphi interface client and server code can be made from this definition:
const
C_TestService_Path = '/testservice.TestService/';
type
TTime = record
public
[Serialize(1)] sec: UInt32;
function Serialize: TBytes;
procedure Deserialize(const aData: TBytes);
end;
ITestService = interface
function Sleep(const aTime: TTime): TTime;
end;
This interface can be used in Delphi client:
FClient: ITestService;
function TestService: ITestService;
begin
if FClient = nil then
FClient := TTestService_Client.Create( TGrpcHttp2Client.Create('127.0.0.1', 1000) );
Result := FClient;
end;
procedure Sleep(aSec: Integer);
var t: TTime;
begin
t.sec := aSec;
t := TestService().Sleep(t);
end;
The server side implementation is straight forward:
type
TTestService_Impl = class(TBaseGrpcImplementation, ITestService_Server)
protected
function Sleep(const aTime: TTime): TTime;
end;
function TTestService_Impl.Sleep(const aTime: TTime): TTime;
begin
SysUtils.Sleep( (aTime.sec * 1000) );
Result.sec := aTime.sec;
end;
And we start both a http/2 and a WebSocket server for this implementation:
FGrpcServer: TGrpcServer;
FWsServer: TGrpcWsServer;
FGrpcServer := TGrpcServer.Create(''{any}, 1000);
FGrpcServer.RegisterImplementation(C_TestService_Path, TTestService_Impl);
FGrpcServer.StartListen;
Writeln('gRPC server ready on *:1000');
FWsServer := TGrpcWsServer.Create('', 1001);
FWsServer.RegisterImplementation(C_TestService_Path, TTestService_Impl);
FWsServer.StartListen;
Writeln('gRPC WS server ready on *:1001');
A more complex (but powerful!) call is for example this CountDown call: you start this server side function with one call and it will stream back the elapsed time each second:
service TestService {
rpc CountDown(Time) returns (stream Time) {}
}
The Delphi interface is a bit more complicated, with a separate client and server interface:
type
TTimeCallback = reference to procedure(const aTime: TTime; aHasData, aClosed: Boolean);
ITestService_Client = interface
procedure CountDown(const aTime: TTime; const aResponseCallback: TTimeCallback);
end;
ICountDown_Send = interface
procedure Send(const aTime: TTime);
procedure CloseSend;
end;
ITestService_Server = interface
procedure CountDown(const aTime: TTime; const aOutputStream: ICountDown_Send);
end;
The Delphi client should supply a callback:
var
t: TTime;
begin
Log('Counting down...');
t.sec := 5;
TestService().CountDown(t,
procedure(const aTime: TTime; aHasData, aClosed: Boolean)
begin
if aHasData then
Log(Format('Count down %ds',[aTime.sec]));
if aClosed then
Log('Countdown DONE');
end);
The server has a for loop (which will run in a separate thread), streams back the time each second and closes the request:
type
TTestService_Impl = class(TBaseGrpcImplementation, ITestService_Server)
procedure CountDown(const aTime: TTime; const aOutputStream: ICountDown_Send);
end;
procedure TTestService_Impl.CountDown(const aTime: TTime; const aOutputStream: ICountDown_Send);
var
i: Integer;
t: TTime;
begin
for i := aTime.sec downto 0 do
begin
t.sec := i;
aOutputStream.Send(t);
SysUtils.Sleep(1 * 1000)
end;
aOutputStream.CloseSend;
end;
The other way around is also possible: stream separate data parts to the server and get a single response back:
service TestService {
rpc CalcSum(stream Time) returns (Time) {}
}
Again, the Delphi client and server interfaces are different:
ICalcSumStream = interface
procedure Send(const aTime: TTime);
function CloseAndRecv(out aResult: TTime): Boolean;
end;
ITestService_Client = interface
function CalcSum: ICalcSumStream;
end;
ICalcSum_Recv = interface(IGrpcMemStream)
function Recv(out aTime: TTime; aWaitTimeout: Integer): TGrpcWaitResult;
end;
ITestService_Server = interface
function CalcSum(const aInputStream: ICalcSum_Recv): TTime;
end;
The client can send more data asynchronously, the server will respond when the client closes the request:
var
s: string;
strm: ICalcSumStream;
t: TTime;
begin
strm := TestService().CalcSum();
while InputQuery('Amount', 'Amount', s) do
begin
t.sec := s.ToInteger();
strm.Send(t);
end;
if strm.CloseAndRecv(t) then
Log('Sum = ' t.sec.ToString() );
end;
The server will directly process each input, till the request is closed:
function TTestService_Impl.CalcSum(const aInputStream: ICalcSum_Recv): TTime;
var t: TTime;
begin
Result.sec := 0;
repeat
case aInputStream.Recv(t, 1 * 1000) of
wrTimeout, wrNoData: Continue;
wrData: Inc(Result.sec, t.sec);
wrClosed: Break;
end;
until False;
end;
The most powerful and complex call is the two way streaming call, which will run untill one of parties closes the call:
service TestService {
rpc UpdateableCountDown(stream Time) returns (stream Time) {}
}
The interface code has multiple types to support this:
TTimeCallback = reference to procedure(const aTime: TTime; aHasData, aClosed: Boolean);
IUpdateableCountDown_Send = interface
procedure Send(const aTime: TTime);
procedure CloseSend;
end;
ITestService_Client = interface
function UpdateableCountDown(const aResponseCallback: TTimeCallback): IUpdateableCountDown_Send;
end;
IUpdateableCountDown_Recv = interface(IGrpcMemStream)
function Recv(out aTime: TTime; aWaitTimeout: Integer): TGrpcWaitResult;
end;
ITestService_Server = interface
procedure UpdateableCountDown(const aInputStream: IUpdateableCountDown_Recv; const aOutputStream: IUpdateableCountDown_Send);
end;
The client registers a callback first and starts the call when the first amount is send:
var
s: string;
strm: IUpdateableCountDown_Send;
t: TTime;
begin
strm := TestService().UpdateableCountDown(
procedure(const aTime: TTime; aHasData, aClosed: Boolean)
begin
if aHasData then
Log(Format('Count down %ds',[aTime.sec]));
if aClosed then
Log('Countdown DONE');
end);
while InputQuery('Amount', 'Amount', s) do
begin
t.sec := s.ToInteger();
Log('Updating: ' + s);
strm.Send(t);
end;
strm.CloseSend();
end;
The server needs to wait for the first data and can process the next data in a wait loop:
procedure TTestService_Impl.UpdateableCountDown(const aInputStream: IUpdateableCountDown_Recv;
const aOutputStream: IUpdateableCountDown_Send);
var
i, iCount: Integer;
t: TTime;
begin
//wait for first data
repeat
case aInputStream.Recv(t, 1000) of
wrData: Break;
wrTimeout, wrNoData: Continue;
wrClosed: Exit;
end;
until False;
try
repeat
for i := t.sec downto 0 do
begin
t.sec := i;
aOutputStream.Send(t);
SysUtils.Sleep(1 * 1000);
case aInputStream.Recv(t, 1) of
wrData: Break; //restart repeat loop
wrClosed: Exit;
end;
if i = 0 then
Exit;
end;
until False;
finally
aOutputStream.CloseSend;
end;
end;
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。