代码拉取完成,页面将自动刷新
在项目的高速发展中我们往往会为项目引入一堆第三方的中间件来确保项目的稳健发展(如:Redis/Kafka/Zookeeper等),从而导致项目开发运维成本都直线提升。 Net的社区比尴尬选择面比较少。该组件立志于解决在Net环境中,常用的分布式工具对于第三方的依赖,以解决项目高速发展所遭遇的扩展问题。 In process 可以非常有效的降低我们的开发运维成本,优化开发体验。
基于Orleans二次开发。遵循Actor设计思想,再对大量项目瓶颈难点进行分析总结后推出的一种套解决方案。 以解决项目高速发展所遭遇的扩展问题。无第三方依赖降低运维成本,优化开发体验。
最新更新一解除对SqlServer的依赖。
直接在项目中引入 Components 项目,并还原依赖。
或者直接在包管理器中搜索:iTool.ClusterComponent
并安装
var builder = new iToolHostBuilder();
builder.UseAdoNetClustering(new AdoNetClusterOptions
{
AdoNetOptions = new AdoNetOptions
{
DataSource = "127.0.0.1,2433",
UID = "sa",
PWD = "zhuJIAN320"
},
EndpointsOptions = new EndpointsOptions
{
//AdvertisedIP = null, // 外网IP,默认为空
//Port = inputarr[0], // 指定集群端口号,默认为: 11111
//GatewayPort = inputarr[1] // 指定客户端口号,默认为: 33333
},
ClusterOptions = new ClusterIdentificationOptions(),
ResponseTimeout = TimeSpan.FromSeconds(15) // Call 超时时间
});
builder.UseStreamProvider("TestStream", 20);
var iToolHost = await builder.BuildAndStartAsync();
// 声名订阅逻辑
public class TestSubscribeStreamHandler : SubscribeQueueHandler<string>
{
string topic;
public TestSubscribeStreamHandler(string topic, string streamNamespace)
: base(topic, streamNamespace)
{
this.topic = topic;
}
public override Task OnErrorAsync(Exception ex)
{
Console.WriteLine("OnErrorAsync:" + ex.Message);
return Task.CompletedTask;
}
public async override Task OnMessageAsync(string message, StreamSequenceToken token)
{
try
{
if (token == null)
{
Console.WriteLine($"topic:{this.topic},message:{message}");
}
else
{
var key = $"{token.SequenceNumber}_{token.EventIndex}";
Console.WriteLine($"topic:{this.topic},message:{message},SequenceNumber:{token.SequenceNumber},EventIndex:{token.EventIndex}");
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
await Task.CompletedTask;
}
}
// 订阅消息
var subscribeStreamHandler = new TestSubscribeStreamHandler("topic", "groupName");
await subscribeStreamHandler.StartAsync();
// 发布消息
var handler1 = new ProducerQueueHandler("topic", "groupName");
handler1.SendMessageAsync(input + $",{DateTime.Now}");
// KeyValue
{
var storageService = clusterHostClient.GetService<IStorageService>("cacheName");
// 获取缓存
string statevalue = await storageService.GetState();
// 修改缓存
await storageService.Modify("akjsdhkasdhkjasdhkajsdhk");
// 删除缓存
await storageService.Remove();
}
// Hash
{
var service = clusterHostClient.GetService<IHashReader>("hashTableName");
// set
await service.SetAsync("fieldv", input);
// get field
var fieldv = await service.GetAsync("fieldv");
// remove field
await service.RemoveAsync("fieldv");
// remove hash table
await service.RemoveAsync();
}
// set
{
var service1 = clusterHostClient.GetService<ISetReader>("setList1");
var service2 = clusterHostClient.GetService<ISetReader>("setList2");
var service3 = clusterHostClient.GetService<ISetReader>("setList3");
// 差集
var list = await service3.GetDifferencesAsync(new string[] { "setList2" });
// 交集
list = await service3.GetIntersectAsync(new string[] { "setList2" });
// 并集
list = await service3.GetUnionAsync(new string[] { "setList1", "setList2" });
// get set list
list = await service2.GetAsync();
// 是否存在
var isExistValue = await service3.ExistsAsync("7");
// remove item
await service2.RemoveAsync("4");
// remove all
await service3.RemoveAsync();
}
// zset
{
var service = clusterHostClient.GetService<IZSetReader>("zsetList");
// set
await service.SetAsync("asdfhagsd15", 15);
await service.SetAsync("asdfhagsd16", 16);
await service.SetAsync("asdfhagsd5", 5);
await service.SetAsync("asdfhagsd4", 4);
await service.SetAsync("asdfhagsd8", 8);
await service.SetAsync("asdfhagsd17", 17);
await service.SetAsync("asdfhagsd160", 160);
await service.SetAsync("asdfhagsd222", 222);
await service.SetAsync("asdfhagsd999", 999);
// 获取有序列表
slist = await service.GetAsync();
// 获取第10位元素
value = await service.GetByIndexAsync(10);
// 获取指定 score 元素
value = await service.GetByScoreAsync(222);
// 获取起始区间元素
list = await service.GetRangeAsync(10, 333);
// 对应 remove 操作
}
{
var scopeProvider = new OrderlyWorkScopeProvider("lock_Name");
Parallel.For(1, 2000, new ParallelOptions { MaxDegreeOfParallelism = 4 }, async index =>
{
using (await scopeProvider.CreateWorkUnitScopeAsync())
{
// logic
Console.WriteLine("get lock success:{0},{1}", index, DateTime.Now);
await Task.Delay(1000);
}
});
}
{
int limit = 3;
var scopeProvider = new LimitWorkScopeProvider(limit, "test_limit_name");
Parallel.For(1, 10000, new ParallelOptions { MaxDegreeOfParallelism = 4 }, async index =>
{
using (await scopeProvider.CreateWorkUnitScopeAsync())
{
// 以获取执行权限, ps:这里将支持三个并发
Console.WriteLine("get excuter success:{0},{1}", index, DateTime.Now);
await Task.Delay(1000);
}
});
}
{
int cacheResultSize = 100;
string actionGroup = "test_request_action";
IRequestIdempotenceService requestIdempotenceService = cluster.GetService<IRequestIdempotenceService>(cacheResultSize, actionGroup);
Parallel.For(1, 2000, new ParallelOptions { MaxDegreeOfParallelism = 4 }, async index =>
{
if (await requestIdempotenceService.StartIfNotExistAsync("test_request_token" + (index % 10), 10000))
{
// 获取执行权限
Console.WriteLine("get excuter success:{0},{1}", index, DateTime.Now);
await Task.Delay(1000);
await requestIdempotenceService.SetResultAsync("test_request_token" + (index % 10), DateTime.Now);
}
else
{
// token 已处理,直接返回结果
object value = await requestIdempotenceService.GetResultAsync("test_request_token" + (index % 10));
Console.WriteLine("get result value:" + (index % 10) + value.ToString());
}
});
}
支持分布式事务,并发读写动态负载平衡。基于Lucene和自定义函数增强功能同时增加操作效率。 无需提前建表建库,提更运行时检查。降低数据库使用复杂度,和开发效率。
{
var executor = cluster.GetSqlExecutor();
await executor.ExecuteNonQueryNoResultAsync("delete locations");
await executor.ExecuteNonQueryAsync(CityGeoInfo.GetCityGeoInfos().First().ToString());
await Parallel.ForEachAsync(CityGeoInfo.GetCityGeoInfos(), async (item, canceltoken) =>
{
await executor.ExecuteNonQueryNoResultAsync(item.ToString());
});
// select distance(120.53,36.86,x,y) distance,CITY from locations where search(shoube(CITY),distmap(120.53,36.86,x,y),5) in ('三%',200)
// select distance(120.53,36.86,x,y) distance,CITY from locations where search(must(CITY),distmap(120.53,36.86,x,y)) in ('三*',1000) order by distance(120.53,36.86,x,y) desc
// search 代表走lucene
// search(name,tag) like '%打游戏%'
// search(name,tag, size: 20) like '%打游戏%'
// search(name,tag, page:1, size:20) like '%打游戏%' 分页查询
// search(name,tag, token:'previous page token', size: 20) like '%打游戏%' 深度查询
// 复合条件:
// search(shoube(CITY),distmap(120.53,36.86,x,y),5) in ('三%',200) 复合条件查询
// - shoube(...fields) 可以
// - must(...fields) 必须
// - mustnot(...fields) 必须不
// - distmap(x1,y1,x2,y2) 坐标距离索引(must) 单位km
// - distmapnot(x1,y1,x2,y2) 坐标距离索引(must not) 单位km
// order by sortdist(x,y) asc | desc 使用lucene 排序
// distance (x1,y1,x2,y2) 计算两点距离 单位km
// 接口方法
interface iSqlProvider iSqlProvider = cluster.GetSqlExecutor();
Task<(List<T> data, int total, string token)> ExecuteReaderAsync<T>(string sql, params SqliteParameter[] parameters) where T : class, new();
ValueTask<(string data, int total, string token)> ExecuteReaderAsync(string sql, params SqliteParameter[] parameters);
ValueTask<object> ExecuteScalarAsync(string sql, params SqliteParameter[] parameters);
ValueTask<long> ExecuteNonQueryAsync(string sql, params SqliteParameter[] parameters);
/// <summary>
/// 不保证并发执行顺序
/// </summary>
ValueTask ExecuteNonQueryNoResultAsync(string sql, params SqliteParameter[] parameters);
ValueTask ExecuteTransactionAsync(List<ExecuteItemOptions> executeItems);
ValueTask ExecuteTransactionOfLockTableAsync(List<ExecuteItemOptions> executeItems);
ValueTask BatchExecuteNonQueryAsync(List<ExecuteItemOptions> executeItems);
}
文件复用功能基于文件摘要(md5)
// 文件删除
[HttpDelete("{id}")]
public async Task<IActionResult> DeleteFileAsync(string id)
{
iFileService iFileService = _storageService.GetService<iFileService>(id);
await iFileService.DeleteFileAsync();
return base.Ok($"Deleted {id} successfully");
}
// 文件预览
[HttpGet("{id}/{width}/{height}/view")]
public async Task<FileStreamResult> DownloadView(string id, int width, int height)
{
_logger.LogError(10, "就是报个错");
iFileService iFileService = _storageService.GetService<iFileService>(id);
var info = await iFileService.GetFileInfoAsync();
if (info.UploadState < 200)
{
return default;
}
var fileBytes = await iFileService.GetStreamAsync(width, height);
this.Response.ContentLength = fileBytes.Length;
this.Response.Headers.Add("Accept-Ranges", "bytes");
this.Response.Headers.Add("Content-Range", "bytes 0-" + fileBytes.Length);
return new FileStreamResult(new MemoryStream(fileBytes), info.ContentType);
}
// 文件下载
[HttpGet("{id}")]
public async Task<object> DownLoadFile(string id)
{
iFileService iFileService = _storageService.GetService<iFileService>(id);
var info = await iFileService.GetFileInfoAsync();
MemoryStream fileStream = new MemoryStream(info.TotalLength);
if (info.UploadState == 200)
{
var file = await iFileService.GetStreamAsync();
await fileStream.WriteAsync(file.FileStream, 0, file.FileStream.Length);
}
else if (info.UploadState == 201)
{
int page = 0;
while (true)
{
page++;
var file = await iFileService.GetStreamAsync(page);
await fileStream.WriteAsync(file.FileStream, 0, file.FileStream.Length);
if (file.IsEndNUmber)
{
break;
}
}
}
else
{
return Results.Ok();
}
this.Response.ContentLength = info.TotalLength;
this.Response.Headers.Add("Accept-Ranges", "bytes");
this.Response.Headers.Add("Content-Range", "bytes 0-" + info.TotalLength);
fileStream.Position = 0;
return File(fileStream, info.ContentType, string.Format("{0}{1}", id, info.SuffixName));
}
// 文件管理脚本
[HttpGet("{queryScript}/query")]
public async Task<IActionResult> GetAllFileDetails(string queryScript)
{
iFileService iFileService = _storageService.GetService<iFileService>("0");
var files = await iFileService.QueryFileInfoAsync(queryScript);
return Ok(files);
}
// 文件详情查询
[HttpGet("details/{id}")]
public async Task<IActionResult> GetFileDetails(string id)
{
iFileService iFileService = _storageService.GetService<iFileService>(id);
var info = await iFileService.GetFileInfoAsync();
return Ok(info);
}
// 文件上传
[HttpPost]
[DisableRequestSizeLimit]
public async Task<List<string>> UploadFile([FromForm] List<IFormFile> Files)
{
List<string> files = new List<string>();
await Parallel.ForEachAsync(Files, async (file, token) =>
{
await using (var stream = file.OpenReadStream())
{
// Step 1 获取文件Key
var retVal = MD5.Create().ComputeHash(stream);
StringBuilder stringBuilder = new StringBuilder();
foreach (var item in retVal)
{
stringBuilder.Append(item.ToString("x2"));
}
string fileKey = stringBuilder.ToString();
if (files.Contains(fileKey))
{
files.Add(fileKey);
return;
}
files.Add(fileKey);
// Step 2 获取Service
iFileService iFileService = _storageService.GetService<iFileService>(fileKey);
if (await iFileService.IsExistsAsync())
{
// 文件已经存在
return;
//return Results.Ok(fileKey);
}
// Step 3 定义缓冲区
int bufCount = 1024 * 128; // kb
byte[] bufs = new byte[stream.Length > bufCount ? bufCount : stream.Length];
{
// 如果文件小于缓冲区大小,则直接提交
if (stream.Length <= bufCount)
{
stream.Position = 0;
await stream.ReadAsync(bufs, 0, (int)stream.Length);
}
await iFileService.UploadAsync(new UploadInfo
{
CreateDate = DateTime.Now,
FileStream = stream.Length > bufCount ? new byte[0] : bufs,
Role = "admin",
User = "zxf",
SuffixName = Path.GetExtension(file.FileName),
TotalLength = stream.Length > bufCount ? 0 : bufs.Length,
ContentType= file.ContentType
});
}
// 大文件分片上传
if (stream.Length > bufCount)
{
int index = 0, streamLength = (int)stream.Length, maxPage = (int)Math.Ceiling((decimal)streamLength / bufCount);
stream.Position = 0;
// 分片
while (true)
{
int sequence = await stream.ReadAsync(bufs, 0, bufCount);
if (sequence == 0) {
break;
}
index++;
await iFileService.UploadPieceAsync(new UploadPiece
{
Number = index,
FileStream = bufs.Take(sequence).ToArray(),
IsEndNUmber = index == maxPage
});
}
await iFileService.UploadComplatedAsync();
}
}
});
return files;
}
赶快下载项目体验吧。
如果该项目能帮助到你,就给个 star 吧。
Thanks, Jian
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。