1 Star 1 Fork 2

金泉 / freetds-samples

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
func_insertDat.c 9.00 KB
一键复制 编辑 原始数据 按行查看 历史
jinquan 提交于 2019-11-05 03:56 . add new mssqltool
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <time.h>
#include <getopt.h>
#include "mssql_common.h"
#include "stpool.h"
#include "msglog.h"
#define STR_LEN 400 //随机输出的字符串长度。
#define CHAR_MIN 'a'
#define CHAR_MAX 'z' //输出随机字符串每个字符的最大最小值。
void random_string(char* random_buf, unsigned long seed)
{
random_buf[STR_LEN + 1] = '\0';
//srand(time(NULL));
srand(seed);
for(int i = 0; i < STR_LEN; i ++){
random_buf[i] = rand()%(CHAR_MAX-CHAR_MIN + 1) + CHAR_MIN;
}
}
#define QTYPE_WRITE 1
#define QTYPE_READ 2
typedef struct __main_thr_passin_para{
struct __database_link* dblink; //database link information
char tabname[64];
struct __result_set* tabcols;
int querytype;
}MAINTHR_PASSIN_PARA;
typedef struct __table_columns{
unsigned long idx;
char name[32];
char code[STR_LEN+10];
}TAB_COLS;
typedef struct __stpool_parameter
{
unsigned long wkidx; //worker index
struct __main_thr_passin_para* mainthrpara;
struct __table_columns columns;
MSSQL_CONN conn;
}TP_PARA;
void TP_PARA_CONSTRUCT(struct __stpool_parameter* tppara)
{
tppara->columns.idx = tppara->wkidx+1;
sprintf(tppara->columns.name, "HAH.IT.INFRA.%d", tppara->wkidx+1);
random_string(tppara->columns.code, tppara->wkidx);
}
/**
* @brief 事务处理程序.
* @Notes stpool线程池模型下的事务处理程序固定格式.
* @param ptsk, stpool线程传入参数
* @retval None.
*/
void MYAPP_TASK(struct sttask *ptsk)
{
struct __stpool_parameter* tppara = (struct __stpool_parameter*)(ptsk->task_arg);
char mssqlbuf[STR_LEN+100]; memset(mssqlbuf, 0x00, sizeof(mssqlbuf));
if(&(tppara->conn) != NULL){
MSSQL_CONN_OPEN(&(tppara->conn), tppara->mainthrpara->dblink);
switch(tppara->mainthrpara->querytype){
case QTYPE_WRITE:
sprintf(mssqlbuf, "INSERT INTO %s (idx, name, code) VALUES (%ld, '%s', '%s');", \
tppara->mainthrpara->tabname, \
tppara->columns.idx, tppara->columns.name, tppara->columns.code);
break;
case QTYPE_READ:
sprintf(mssqlbuf, "SELECT name from %s WHERE idx=%ld;", \
tppara->mainthrpara->tabname, \
tppara->columns.idx);
break;
default:
break;
}
//printf("%s\n", mssqlbuf);
dbcmd(tppara->conn.dbprocess, mssqlbuf); //sql text 保存到数据库连接句柄的缓存中
if(dbsqlexec(tppara->conn.dbprocess) == FAIL){
fprintf(stderr, "TDS/> sqlexec error!\n");
dbclose(tppara->conn.dbprocess);
exit(EXIT_FAILURE);
}else{
printf("worker[%ld] running...\r", tppara->wkidx);
}
dbclose(tppara->conn.dbprocess);
dbexit();
}
}
unsigned long activate_workloads(stpool_t *thrpool, unsigned long total_workloads,
struct __main_thr_passin_para* mainthrpara)
{
unsigned long processed_workloads = 0;
// libstpool 单个线程池最大只能添加 10000个工作负载,
// 大数据环境下, 工作负载需要分断处理
// CentOS7.5 用户最大线程数为 15066
int max_workloads_per_running_cycle = 10000;
/* 为工作负载分配 传入参数 存储空间 */
struct __stpool_parameter* tppara = (struct __stpool_parameter*)malloc
(total_workloads * sizeof(struct __stpool_parameter));
for(unsigned long i = 0; i < total_workloads; i++)
{
tppara[i].wkidx = i;
tppara[i].mainthrpara = mainthrpara;
// 传入参数填充
TP_PARA_CONSTRUCT(&tppara[i]);
// 事务处理线程所用到的变量,通过函数的第5个参数传入
stpool_add_routine(thrpool, "workloads processing", MYAPP_TASK, NULL, &tppara[i], NULL);
processed_workloads++;
}
printf("total [%d] workloads added to ThreadPool.\n", processed_workloads);
/* Wait for all workloads' tasks being done completely */
/* >>> */ stpool_wait_all(thrpool, -1); /* >>> */
/* ................................................... */
printf("APP_TASK/> {INFO} total processed [%ld] data-items.\n", processed_workloads);
//sleep(1);
/* 释放工作负载 传入参数 存储空间 */
free(tppara);
return processed_workloads;
}
int confirm_dblink(struct __database_link dblink, char* table)
{
char input[8];
printf("SQL Server Host IP: [%s]\n", dblink.SQL_SERVER_IP);
printf("SQL Server Host PORT: [%s]\n", dblink.SQL_SERVER_PORT);
printf("Database name: [%s]\n", dblink.db);
printf("Username: [%s]\n", dblink.user);
printf("Table name: [%s]\n", table);
printf("Are you confirm these database link parameters?[yes],[no]: ");
scanf("%s", input);
if(strcmp(input, "yes") == 0||strcmp(input, "Y") == 0||strcmp(input, "y") == 0){
return 1;
}else
return 0;
}
struct option options[] = {
{ "help", no_argument, NULL, 'h' },
{ "query", required_argument, NULL, 'Q' },
{ "host", required_argument, NULL, 'H' },
{ "port", required_argument, NULL, 'P' },
{ "user", required_argument, NULL, 'u' },
{ "password", required_argument, NULL, 'p' },
{ "database", required_argument, NULL, 'D' },
{ "table", required_argument, NULL, 't' },
{ "rows", required_argument, NULL, 'r' },
{ "thread-max", required_argument, NULL, 'x' },
{ "thread-min", required_argument, NULL, 'y' },
{ NULL, 0, 0, 0 }
};
const char* help_info =
" -h , --help help information\n"
" -Q , --query SQL Query type.[INSERT, SELECT]\n"
" -H , --host SQL Server host IP\n"
" -P , --port SQL Server host PORT\n"
" -D , --database database name\n"
" -u , --user username\n"
" -p , --password password\n"
" -t , --table table name\n"
" -r , --rows number of rows need insert\n"
" -x , --thread-max maximum active threads in POOL\n"
" -y , --thread-min minimum active threads in POOL\n"
" --------------------------------------------------------------------------------------\n"
" Usage: ./mssqltool --query INSERT -H 172.24.1.122 -P 1433 -D testDB -u sa -p 4aKadedr -t Table_1 -r 10000\n"
" ./mssqltool --query INSERT -t Table_1 -r 10000\n"
" --------------------------------------------------------------------------------------\n";
int main(int argc, char* argv[])
{
struct __database_link dblink;
DATABASE_LINK_CONSTRUCT(&dblink, "172.24.1.122", 1433,
"sa", "4aKadedr",
"testDB");
struct __main_thr_passin_para mainthrpara;
mainthrpara.dblink = &dblink;
/* 线程池初始化 */
stpool_t *thrpool;
long eCAPs;
MSG_log_set_level(LOG_ERR);
eCAPs = eCAP_F_DYNAMIC|eCAP_F_SUSPEND|eCAP_F_THROTTLE|eCAP_F_ROUTINE|
eCAP_F_DISABLEQ|eCAP_F_PRIORITY|eCAP_F_WAIT_ALL;
int threads_max=500, threads_min=300, numOfWkCnt=5000;
int n = 0;
while(1){
n = getopt_long(argc, argv, "hQ:H:P:u:p:D:t:r:x:y:", options, NULL);
if (n < 0)
break;
switch(n){
case 'Q':
if(strncmp(optarg, "INSERT", 6) == 0)
mainthrpara.querytype = QTYPE_WRITE;
else if(strncmp(optarg, "SELECT", 6) == 0)
mainthrpara.querytype = QTYPE_READ;
break;
case 'H':
strcpy(dblink.SQL_SERVER_IP, optarg);
break;
case 'P':
strcpy(dblink.SQL_SERVER_PORT, optarg);
break;
case 'D':
strcpy(dblink.db, optarg);
break;
case 'u':
strcpy(dblink.user, optarg);
break;
case 'p':
strcpy(dblink.pass, optarg);
break;
case 't':
strcpy(mainthrpara.tabname, optarg);
break;
case 'r':
numOfWkCnt = atoi(optarg);
break;
case 'x':
threads_max = atoi(optarg);
break;
case 'y':
threads_min = atoi(optarg);
break;
case 'h':
printf("%s", help_info);
return 1;
default:
break;
}
}
MSSQL_CONN conn;
if(confirm_dblink(dblink, mainthrpara.tabname)){
sprintf(dblink.SQL_SERVER, "%s:%s", dblink.SQL_SERVER_IP, dblink.SQL_SERVER_PORT);
MSSQL_CONN_OPEN(&conn, &dblink);
mssql_query_column_name(&conn, mainthrpara.tabname);
mainthrpara.tabcols = &(conn.result); //columns informations.
}else
return 2;
thrpool = stpool_create("MSSQL-INSERT", eCAPs, threads_max, threads_min, 0, 1);
if(!thrpool){
fprintf(stderr, "ThreadPool/> {ERROR} Application-Workers thread pool initialize failed!\n");
}else{
fprintf(stderr, "%s\n", stpool_stat_print(thrpool));
}
/* >>>>>>>>>>> stpool线程池添加工作负载 >>>>>>>>>>> */
activate_workloads(thrpool, numOfWkCnt, &mainthrpara);
/* >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> */
//释放MSSQL_CONN 查询结果集内存
MSSQL_CONN_RSTSET_CLEAN(&(conn.result));
/* 释放线程池资源 */
if(thrpool != NULL){
stpool_release(thrpool);
fprintf(stderr, "ThreadPool/> {INFO} Current Thread-pool release.\n");
}else{
fprintf(stderr, "ThreadPool/> {INFO} Current Thread-pool is null.\n");
}
return 0;
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C
1
https://gitee.com/jinquan711/freetds-samples.git
git@gitee.com:jinquan711/freetds-samples.git
jinquan711
freetds-samples
freetds-samples
master

搜索帮助

344bd9b3 5694891 D2dac590 5694891