# crock **Repository Path**: surongyou/crock ## Basic Information - **Project Name**: crock - **Description**: spark-sql脚手架 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 2 - **Created**: 2024-08-28 - **Last Updated**: 2024-08-28 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## crock #### 主类 ``` SqlSubmit.java ``` #### 提交方式 ```aidl spark-submit \ --class com.crock.SqlSubmit \ --name "xxxx" \ --master yarn \ --deploy-mode cluster \ --num-executors 15 \ --driver-memory 4g \ --executor-memory 10g \ --executor-cores 6 \ --queue root \ crock.jar -c 5 -l -q "set ....;insert into ...." ``` #### 参数说明 ```aidl -f 说明:支持oss、hdfs、本地路径 -q 说明:sql脚本,多个sql用分号隔开。目前仅支持insert、create、set、drop语句。 -c 说明:写入hive时,合并小文件 -l 说明:获取血缘关系,写入图数据库。 -p 说明:并发读取mysql数据配置参数。 ``` ### 支持数据源 读写mysql: ```aidl 读: set mysql_url =ip:3306/database; set mysql_user =user_name; set mysql_password =password; set mysql_tables =table_name1,table_name2; 同一库下多个表需要逗号分割。 set model =read; 配置此信息后,spark会将table_name注册为视图,sql中可直接使用table_name进行查询及etl。 由于读取mysql默认为1并行度,为增加读取速度,增加并行读取配置。 使用方式示例如下: -p '{column:create_time,startDate:2022-08-29,endDate:yesterday,parallelism:100}' 或 -p '{column:id,lowerBound:0,upperBound:30000000,parallelism:100}' 写: set mysql_url =ip:3306/databases; set mysql_user =user_name; set mysql_password =password; set mysql_tables =table_name3; set model =write_upsert/write_append/write_overwrite; upsert:根据唯一约束进行插入或更新 append:追加 overwrite:覆盖。不会删除表 示例: insert into table_name3 select * from table_name1 t1 join table_name2 t2 on t1.id=t2.id ``` 写kafka ```aidl set kafka_bootstrap_servers =ip:port; set kafka_topic =topic; 示例: insert into topic select 拼个json as value from table_name1 t1 join table_name2 t2 on t1.id=t2.id ``` 写es ```aidl set es_nodes =ip; set es_port =9200; set es_nodes_wan_only =true; set es_net_http_auth_user =username; set es_net_http_auth_pass =password; set transport_type =security3; set http_type =security3; set es_mapping_id =mapping_id; set es_resource =index; 示例: insert into index select * from table_name1 t1 join table_name2 t2 on t1.id=t2.id ``` 读写redis ```aidl 读: set redis_host =ip; set redis_port =port; set redis_password =password; set redis_db =db; set redis_key =key; 实例:读取mysql,替换sql中{redis_value}为redis值,写入test insert into test select concat('{\"report_id\"', ':', report_id, '}') as value from table_name1 where update_time >= '{redis_value}' and flag=1 group by report_id;" ``` 读写hive ```aidl insert overwrite table ... select ... ```