diff --git a/examples/tuning/kafka/README.md b/examples/tuning/kafka/README.md index a410502697146ae0cea3fdd79fc215f3b036c483..dd49d3454db65b432e631cd1250381f47cbf6943 100644 --- a/examples/tuning/kafka/README.md +++ b/examples/tuning/kafka/README.md @@ -24,3 +24,23 @@ Running `prepare.sh` to deploy, follow the hint. The script will create a new ss - `launcher_on_server.sh`, launch the `benchmark_on_client.sh` on the client, and copy the result log back to the server. - `benchmark_on_client.sh`, do Kafka producer and consumer test, generate result log. - `manager.sh`, control the Kafka service. + +## Tuning 🔧 + +After running the deployment script, copy the server-side YAML which you want to use to **/etc/atuned/tuning/** + +### Start to tuning + +atune-adm tuning --project PROJECT --detail ./CLIENT.yaml + +eg: atune-adm tuning --project kafka --detail ./kafka.yaml + +### Restore the environment + +atune-adm tuning --restore --project PROJECT +eg: atune-adm tuning --restore --project kafka + +## To contact ✉️ + +- Email: me@shangcode.cn +- Gitee: https://gitee.com/shangyingjie diff --git a/examples/tuning/kafka/kafka_client.yaml b/examples/tuning/kafka/kafka_client.yaml index 2c3737836240f99c194d1aea0e7e5472dceaa45e..4bddf0a1cc4d540d7699b11789934bede936c4e8 100644 --- a/examples/tuning/kafka/kafka_client.yaml +++ b/examples/tuning/kafka/kafka_client.yaml @@ -1,10 +1,10 @@ project: "kafka" engine: "gbrt" iterations: 100 -benchmark: "bash ./launcher_on_server.sh" +benchmark: "bash working_dir/launcher_on_server.sh" evaluations: - name: "QPS" info: - get: cat ./kafka_benchmark.log + get: "cat working_dir/kafka_benchmark.log" type: "negative" weight: 100 diff --git a/examples/tuning/kafka/kafka_server.yaml b/examples/tuning/kafka/kafka_server.yaml new file mode 100644 index 0000000000000000000000000000000000000000..3f3e862119ad7059cc97a250a40a0e74f31a2b74 --- /dev/null +++ b/examples/tuning/kafka/kafka_server.yaml @@ -0,0 +1,204 @@ +project: "kafka" +maxiterations: 100 +startworkload: "bash working_dir/manager.sh --start" +stopworkload: "bash working_dir/manager.sh --stop" +object: + - name: "kafka.batch_size" + info: + desc: "Controls how many bytes of data to collect before sending messages to the Kafka broker." + get: grep 'batch.size' working_dir/config/server.properties | cut -d '=' -f 2 + set: sed -i "s/batch.size=[[:digit:]]*/batch.size=$value/" working_dir/config/server.properties + needrestart: "true" + type: "discrete" + dtype: "int" + scope: + - 16384 + - 1048576 + step: 1024 + - name: "kafka.buffer_memory" + info: + desc: "Total bytes of memory the producer can use to buffer records waiting to be sent to the server." + get: grep 'buffer.memory' working_dir/config/server.properties | cut -d '=' -f 2 + set: sed -i "s/buffer.memory=[[:digit:]]*/buffer.memory=$value/" working_dir/config/server.properties + needrestart: "true" + type: "discrete" + dtype: "int" + scope: + - 33554432 + - 67108864 + step: 1048576 + - name: "kafka.compression_type" + info: + desc: "Specify the final compression type for a given topic." + get: grep 'compression.type' working_dir/config/server.properties | cut -d '=' -f 2 + set: sed -i "s/compression.type=[[:digit:]]*/compression.type=$value/" working_dir/config/server.properties + needrestart: "true" + type: "discrete" + dtype: "string" + options: + - "gzip" + - "snappy" + - "lz4" + - "zstd" + - name: "kafka.delivery_timeout_ms" + info: + desc: "An upper bound on the time to report success or failure after a call to send() returns." + get: grep 'buffer.memory' working_dir/config/server.properties | cut -d '=' -f 2 + set: sed -i "s/buffer.memory=[[:digit:]]*/buffer.memory=$value/" working_dir/config/server.properties + needrestart: "true" + type: "discrete" + dtype: "int" + scope: + - 100000 + - 120000 + step: 1000 + - name: "kafka.linger_ms" + info: + desc: "The maximum time to buffer data in asynchronous mode." + get: grep 'linger.ms' working_dir/config/server.properties | cut -d '=' -f 2 + set: sed -i "s/linger.ms=[[:digit:]]*/linger.ms=$value/" working_dir/config/server.properties + needrestart: "true" + type: "continuous" + scope: + - 0 + - 30 + - name: "kafka.log_segment_bytes" + info: + desc: "The maximum size of a single log file" + get: grep 'log.segment.bytes' working_dir/config/server.properties | cut -d '=' -f 2 + set: sed -i "s/log.segment.bytes=[[:digit:]]*/log.segment.bytes=$value/" working_dir/config/server.properties + needrestart: "true" + type: "discrete" + dtype: "int" + scope: + - 1048576 + - 1073741824 + step: 1048576 + - name: "kafka.log_flush_interval_messages" + info: + desc: "The number of messages to accept before forcing a flush of data to disk." + get: grep 'log.flush.interval.messages' working_dir/config/server.properties | awk -F '=' '{print $2}' + set: sed -i 's/^log.flush.interval.messages.*$/log.flush.interval.messages=$value/g' working_dir/config/server.properties + needrestart: "true" + type: "discrete" + scope: + - 5000 + - 50000 + step: 1000 + dtype: "int" + - name: "kafka.log_flush_interval_ms" + info: + desc: "The maximum amount of time a message can sit in a log before we force a flush." + get: grep 'log.flush.interval.ms' working_dir/config/server.properties | awk -F '=' '{print $2}' + set: sed -i 's/^log.flush.interval.ms.*$/log.flush.interval.ms=$value/g' working_dir/config/server.properties + needrestart: "true" + type: "discrete" + scope: + - 500 + - 5000 + step: 100 + dtype: "int" + - name: "kafka.max_in_flight_requests_per_connection" + info: + desc: "The maximum number of unacknowledged requests the client will send on a single connection before blocking." + get: grep 'max.in.flight.requests.per.connection' working_dir/config/server.properties | cut -d '=' -f 2 + set: sed -i "s/max.in.flight.requests.per.connection=[[:digit:]]*/max.in.flight.requests.per.connection=$value/" working_dir/config/server.properties + needrestart: "true" + type: "continuous" + scope: + - 1 + - 5 + - name: "kafka.message_max_bytes" + info: + desc: "The maximum message size the broker accepts." + get: grep 'message.max.bytes' working_dir/config/server.properties | cut -d '=' -f 2 + set: sed -i "s/message.max.bytes=[[:digit:]]*/message.max.bytes=$value/" working_dir/config/server.properties + needrestart: "true" + type: "discrete" + dtype: "int" + scope: + - 0 + - 10485760 + step: 1048576 + - name: "kafka.num_io_threads" + info: + desc: "The number of threads that the server uses for processing requests, which may include disk I/O." + get: grep 'num.io.threads' working_dir/config/server.properties | cut -d '=' -f 2 + set: sed -i "s/num.io.threads=[[:digit:]]*/num.io.threads=$value/" working_dir/config/server.properties + needrestart: "true" + type: "continuous" + dtype: "int" + scope: + - 1 + - 16 + - name: "kafka.num_network_threads" + info: + desc: "The number of threads that the server uses for receiving requests from the network and sending responses to the network" + get: grep 'num.network.threads' working_dir/config/server.properties | cut -d '=' -f 2 + set: sed -i "s/num.network.threads=[[:digit:]]*/num.network.threads=$value/" working_dir/config/server.properties + needrestart: "true" + type: "continuous" + dtype: "int" + scope: + - 1 + - 10 + - name: "kafka.replica_fetch_max.bytes" + info: + desc: "The number of bytes of messages to attempt to fetch for each partition." + get: grep 'replica.fetch.max.bytes' working_dir/config/server.properties | cut -d '=' -f 2 + set: sed -i "s/replica.fetch.max.bytes=[[:digit:]]*/replica.fetch.max.bytes=$value/" working_dir/config/server.properties + needrestart: "true" + type: "discrete" + dtype: "int" + scope: + - 1048576 + - 1073741824 + step: 1048576 + - name: "kafka.socket_send_buffer_bytes" + info: + desc: "The send buffer (SO_SNDBUF) used by the socket server." + get: grep 'socket.send.buffer.bytes' working_dir/config/server.properties | awk -F '=' '{print $2}' + set: sed -i 's/^socket.send.buffer.bytes.*$/socket.send.buffer.bytes=$value/g' working_dir/config/server.properties + needrestart: "true" + type: "discrete" + scope: + - 10240 + - 1024000 + step: 10240 + dtype: "int" + - name: "kafka.socket_receive_buffer_bytes" + info: + desc: "The receive buffer (SO_RCVBUF) used by the socket server." + get: grep 'socket.receive.buffer.bytes' working_dir/config/server.properties | awk -F '=' '{print $2}' + set: sed -i 's/^socket.receive.buffer.bytes.*$/socket.receive.buffer.bytes=$value/g' working_dir/config/server.properties + needrestart: "true" + type: "discrete" + scope: + - 10240 + - 1024000 + step: 10240 + dtype: "int" + - name: "kafka.socket_request_max_bytes" + info: + desc: "The maximum size of a request that the socket server will accept (protection against OOM)." + get: grep 'socket.request.max.bytes' working_dir/config/server.properties | awk -F '=' '{print $2}' + set: sed -i 's/^socket.request.max.bytes.*$/socket.request.max.bytes=$value/g' working_dir/config/server.properties + needrestart: "true" + type: "discrete" + scope: + - 1048576 + - 1048576000 + step: 1048576 + dtype: "int" + - name: "kafka.zookeeper_connection_timeout_ms" + info: + desc: "Timeout in ms for connecting to zookeeper." + get: grep 'zookeeper.connection.timeout.ms' working_dir/config/server.properties | awk -F '=' '{print $2}' + set: sed -i 's/^zookeeper.connection.timeout.ms.*$/zookeeper.connection.timeout.ms=$value/g' working_dir/config/server.properties + needrestart: "true" + type: "discrete" + scope: + - 500 + - 50000 + step: 100 + dtype: "int" diff --git a/examples/tuning/kafka/launcher_on_server.sh b/examples/tuning/kafka/launcher_on_server.sh index 85190c9ab70c173bc0f15dfdaf88c20cc23619d6..62f420b43ee75dabf6aff3d2c5c2838fdb6c3091 100644 --- a/examples/tuning/kafka/launcher_on_server.sh +++ b/examples/tuning/kafka/launcher_on_server.sh @@ -19,13 +19,12 @@ echo 'launch benchmark' -cd "$(dirname "$0")" - +WORKING_DIR="will be replaced after running prepare.sh" KAFKA_CLIENT_IP='will be replaced after running prepare.sh' if [ -f "./client_ssh_key" ]; then - ssh -i client_ssh_key root@"$KAFKA_CLIENT_IP" -t "/usr/bin/bash /root/benchmark_on_client.sh" - scp -i client_ssh_key root@"$KAFKA_CLIENT_IP":/root/kafka_benchmark.log ./ + ssh -i $WORKING_DIR/client_ssh_key root@"$KAFKA_CLIENT_IP" -t "/usr/bin/bash /root/benchmark_on_client.sh" + scp -i $WORKING_DIR/client_ssh_key root@"$KAFKA_CLIENT_IP":/root/kafka_benchmark.log ./ else ssh root@"$KAFKA_CLIENT_IP" -t "/usr/bin/bash /root/benchmark_on_client.sh" scp root@"$KAFKA_CLIENT_IP":/root/kafka_benchmark.log ./ diff --git a/examples/tuning/kafka/manager.sh b/examples/tuning/kafka/manager.sh index 7ccff7eee2a6b18bee7581283baeb4d1c0f6d1e9..9621484fe1c24b06ed8be270bd61707f4f47be5a 100755 --- a/examples/tuning/kafka/manager.sh +++ b/examples/tuning/kafka/manager.sh @@ -27,7 +27,7 @@ USAGE="Usage: task_manager [OPTION] --delete-topic delete topic: kafka-benchmark --help output this usage" -KAFKA_DIR=./kafka_2.13-3.2.0 +KAFKA_DIR="will be replaced after running prepare.sh" ZOOKEEPER_SERVER_START_SCRPIT=$KAFKA_DIR/bin/zookeeper-server-start.sh ZOOKEEPER_SERVER_STOP_SCRPIT=$KAFKA_DIR/bin/zookeeper-server-stop.sh ZOOKEEPER_CONFIG=$KAFKA_DIR/config/zookeeper.properties @@ -36,8 +36,6 @@ KAFKA_SERVER_STOP_SCRPIT=$KAFKA_DIR/bin/kafka-server-stop.sh KAFKA_SERVER_CONFIG=$KAFKA_DIR/config/server.properties KAFKA_TOPIC_SCRIPT=$KAFKA_DIR/bin/kafka-topics.sh -cd "$(dirname "$0")" - if [[ $# -gt 0 ]]; then case $1 in --start) diff --git a/examples/tuning/kafka/prepare.sh b/examples/tuning/kafka/prepare.sh index d67818e445336e3cc4284649cb73281a16f23dde..e2e2c0e6bdfb305a7b1c5f8cab885e966c6ef067 100644 --- a/examples/tuning/kafka/prepare.sh +++ b/examples/tuning/kafka/prepare.sh @@ -13,12 +13,22 @@ ############################################# # @Author : shangyingjie # @Contact : yingjie@isrc.iscas.ac.cn -# @Date : 2022/5/21 +# @Date : 2022/6/20 # @License : Mulan PSL v2 # @Desc : Kafka deployment script # ############################################# -cd "$(dirname "$0")" +while [[ -z "$working_directory" ]]; do + read -p "please specify working directory(absolute path):" working_directory +done + +while [[ -z "$atune_directory" ]]; do + read -p "please specify atune directory(absolute path):" atune_directory +done + +kafka_dir=$working_directory/kafka +kafka_server_config=$kafka_dir/config/server.properties +kafka_tuning_yaml=$atune_directory/examples/tuning/kafka while [[ -z "$kafka_server_ip" ]]; do read -p "please input kafka-server(localhost) IP:" kafka_server_ip @@ -30,22 +40,25 @@ done read -p "enter y to set up new ssh keys for the kafka-client or skip if already deployed:" setup_new_ssh_key if [ "$setup_new_ssh_key" = "y" ]; then - ssh-keygen -t rsa -f client_ssh_key -N "" - ssh-copy-id -i client_ssh_key root@"$kafka_client_ip" + ssh-keygen -t rsa -f $working_directory/client_ssh_key -N "" + ssh-copy-id -i $working_directory/client_ssh_key root@"$kafka_client_ip" fi -echo "update launcher_on_server.sh and benchmark_on_client.sh with value just entered..." -sed -i "s#KAFKA_SERVER_IP=.*#KAFKA_SERVER_IP=$kafka_server_ip#g" benchmark_on_client.sh -sed -i "s#KAFKA_CLIENT_IP=.*#KAFKA_CLIENT_IP=$kafka_client_ip#g" launcher_on_server.sh +echo "update manager.sh, launcher_on_server.sh and benchmark_on_client.sh with value just entered..." +sed -i "s#KAFKA_DIR=.*#KAFKA_DIR=$kafka_dir#g" $working_directory/manager.sh +sed -i "s#working_dir=.*.#working_dir=$working_directory#g" $atune_directory/kafka_client.yaml +sed -i "s#working_dir=.*.#working_dir=$working_directory#g" $atune_directory/kafka_server.yaml +sed -i "s#KAFKA_SERVER_IP=.*#KAFKA_SERVER_IP=$kafka_server_ip#g" $working_directory/benchmark_on_client.sh +sed -i "s#KAFKA_CLIENT_IP=.*#KAFKA_CLIENT_IP=$kafka_client_ip#g" $working_directory/launcher_on_server.sh echo "copy benchmark script to kafka-client" "$kafka_client_ip" if [ "$setup_new_ssh_key" = "y" ]; then - scp -i client_ssh_key benchmark_on_client.sh root@"$kafka_client_ip":/root/ + scp -i $working_directory/client_ssh_key $working_directory/benchmark_on_client.sh root@"$kafka_client_ip":/root/ else - scp benchmark_on_client.sh root@"$kafka_client_ip":/root/ + scp $working_directory/benchmark_on_client.sh root@"$kafka_client_ip":/root/ fi -if [ -f "./kafka_2.13-3.2.0.tgz" ]; then +if [ -f "$working_directory/kafka_2.13-3.2.0.tgz" ]; then echo "Kafka release exist on local." else echo "download Kafka release on kafka-server." @@ -53,24 +66,32 @@ else fi echo "copy Kafka release to kafka-client." -scp ./kafka_2.13-3.2.0.tgz root@"$kafka_client_ip":/root/ +scp -i $working_directory/client_ssh_key $working_directory/kafka_2.13-3.2.0.tgz root@"$kafka_client_ip":/root/ echo "install tar and jdk runtime on kafka-server" yum install -y tar java echo "install tar and jdk runtime on kafka-client" -ssh -i client_ssh_key -t root@"$kafka_client_ip" "yum install -y tar java;" +ssh -i $working_directory/client_ssh_key -t root@"$kafka_client_ip" "yum install -y tar java;" echo "extract Kafka release tarball on kafka-server." -tar -xzf ./kafka_2.13-3.2.0.tgz +tar -xzf $working_directory/kafka_2.13-3.2.0.tgz +mv $working_directory/kafka_2.13-3.2.0/ $working_directory/kafka + echo "extract Kafka release tarball on kafka-client." -ssh -i client_ssh_key -t root@"$kafka_client_ip" "tar -xzf /root/kafka_2.13-3.2.0.tgz" +ssh -i $working_directory/client_ssh_key -t root@"$kafka_client_ip" "tar -xzf /root/kafka_2.13-3.2.0.tgz" echo "modify kafka server configuration" -echo "listeners=PLAINTEXT://0.0.0.0:9092 \ -advertised.listeners=PLAINTEXT://$KAFKA_SERVER_IP:9092" >>./kafka_2.13-3.2.0/config/server.properties +echo "listeners=PLAINTEXT://0.0.0.0:9092 +advertised.listeners=PLAINTEXT://$kafka_server_ip:9092" >>$kafka_server_config +echo "# A-Tune tuning paramters +message.max.bytes=1048588 +buffer.memory=33554432 +linger.ms=0 +max.in.flight.requests.per.connection=5 +delivery.timeout.ms=120000" >>$kafka_server_config echo "start kafka service" -bash ./manager.sh --start +bash $working_directory/manager.sh --start echo "create topic for benchmark" -bash ./manager.sh --create-topic +bash $working_directory/manager.sh --create-topic