From b6983eb342cdf12f8cf577642623a7d5c28966eb Mon Sep 17 00:00:00 2001 From: zhiwei_yang Date: Fri, 15 May 2020 17:04:39 +0800 Subject: [PATCH 01/28] =?UTF-8?q?=E5=88=A0=E9=99=A4=E5=A4=9A=E4=BD=99?= =?UTF-8?q?=E7=9A=84=E5=8D=95=E5=85=83=E6=B5=8B=E8=AF=95=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../eagle/web/sql/execute/KafkaSqlParser.java | 3 +- .../kafka/eagle/consumer/JTestConsumer.java | 29 ------ .../eagle/factory/TestBrokerServiceImpl.java | 40 -------- .../eagle/factory/TestKafkaMetricsImpl.java | 40 -------- .../eagle/factory/TestKafkaProvider.java | 35 ------- .../eagle/factory/TestKafkaServiceImpl.java | 96 ------------------- .../kafka/eagle/factory/TestMx4jProvider.java | 40 -------- .../kafka/eagle/factory/TestZkProvider.java | 35 ------- .../kafka/eagle/factory/TestZkTopic.java | 40 -------- ...erService.java => ClusterServiceTest.java} | 10 +- ...Service.java => DashboardServiceTest.java} | 11 ++- .../kafka/eagle/service/TestAlarmService.java | 33 ------- .../eagle/service/TestConsumerService.java | 33 ------- .../kafka/eagle/service/TestTopicService.java | 33 ------- .../{ipc/TestKSql.java => sql/KSqlTest.java} | 4 +- ...tKafkaParser.java => KafkaParserTest.java} | 30 ++++-- ...entUtils.java => HttpClientUtilsTest.java} | 23 ++--- ...Utils.java => QuartzManagerUtilsTest.java} | 13 +-- .../smartloli/kafka/eagle/util/TestJob.java | 41 -------- .../kafka/eagle/util/TestNetUtils.java | 38 -------- .../kafka/eagle/util/TestZKUtils.java | 38 -------- 21 files changed, 59 insertions(+), 606 deletions(-) delete mode 100644 src/test/java/org/smartloli/kafka/eagle/consumer/JTestConsumer.java delete mode 100644 src/test/java/org/smartloli/kafka/eagle/factory/TestBrokerServiceImpl.java delete mode 100644 src/test/java/org/smartloli/kafka/eagle/factory/TestKafkaMetricsImpl.java delete mode 100644 src/test/java/org/smartloli/kafka/eagle/factory/TestKafkaProvider.java delete mode 100644 src/test/java/org/smartloli/kafka/eagle/factory/TestKafkaServiceImpl.java delete mode 100644 src/test/java/org/smartloli/kafka/eagle/factory/TestMx4jProvider.java delete mode 100644 src/test/java/org/smartloli/kafka/eagle/factory/TestZkProvider.java delete mode 100644 src/test/java/org/smartloli/kafka/eagle/factory/TestZkTopic.java rename src/test/java/org/smartloli/kafka/eagle/service/{TestClusterService.java => ClusterServiceTest.java} (76%) rename src/test/java/org/smartloli/kafka/eagle/service/{TestDashboardService.java => DashboardServiceTest.java} (73%) delete mode 100644 src/test/java/org/smartloli/kafka/eagle/service/TestAlarmService.java delete mode 100644 src/test/java/org/smartloli/kafka/eagle/service/TestConsumerService.java delete mode 100644 src/test/java/org/smartloli/kafka/eagle/service/TestTopicService.java rename src/test/java/org/smartloli/kafka/eagle/{ipc/TestKSql.java => sql/KSqlTest.java} (97%) rename src/test/java/org/smartloli/kafka/eagle/sql/{TestKafkaParser.java => KafkaParserTest.java} (63%) rename src/test/java/org/smartloli/kafka/eagle/util/{TestHttpClientUtils.java => HttpClientUtilsTest.java} (90%) rename src/test/java/org/smartloli/kafka/eagle/util/{TestQuartzManagerUtils.java => QuartzManagerUtilsTest.java} (74%) delete mode 100644 src/test/java/org/smartloli/kafka/eagle/util/TestJob.java delete mode 100644 src/test/java/org/smartloli/kafka/eagle/util/TestNetUtils.java delete mode 100644 src/test/java/org/smartloli/kafka/eagle/util/TestZKUtils.java diff --git a/src/main/java/org/smartloli/kafka/eagle/web/sql/execute/KafkaSqlParser.java b/src/main/java/org/smartloli/kafka/eagle/web/sql/execute/KafkaSqlParser.java index eb271fe..29d4fea 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/sql/execute/KafkaSqlParser.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/sql/execute/KafkaSqlParser.java @@ -91,8 +91,7 @@ public class KafkaSqlParser { } catch (Exception e) { status.put("error", true); status.put("msg", e.getMessage()); - e.printStackTrace(); - LOG.error("Execute sql to query kafka topic has error,msg is " + e.getMessage()); + LOG.error("Execute sql to query kafka topic has error", e); } return status.toJSONString(); } diff --git a/src/test/java/org/smartloli/kafka/eagle/consumer/JTestConsumer.java b/src/test/java/org/smartloli/kafka/eagle/consumer/JTestConsumer.java deleted file mode 100644 index 969592d..0000000 --- a/src/test/java/org/smartloli/kafka/eagle/consumer/JTestConsumer.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.smartloli.kafka.eagle.consumer; - -/** -* TODO -* -* @author smartloli. -* -* Created by Jan 15, 2019 -*/ -public class JTestConsumer { - -} diff --git a/src/test/java/org/smartloli/kafka/eagle/factory/TestBrokerServiceImpl.java b/src/test/java/org/smartloli/kafka/eagle/factory/TestBrokerServiceImpl.java deleted file mode 100644 index 860b5f8..0000000 --- a/src/test/java/org/smartloli/kafka/eagle/factory/TestBrokerServiceImpl.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.smartloli.kafka.eagle.factory; - - -import org.smartloli.kafka.eagle.web.service.BrokerService; -import org.smartloli.kafka.eagle.web.service.impl.BrokerServiceImpl; - -/** - * TODO - * - * @author smartloli. - * - * Created by Jun 14, 2019 - */ -public class TestBrokerServiceImpl { - - private static final BrokerService brokerService = new BrokerServiceImpl(); - - public static void main(String[] args) { - long count = brokerService.partitionNumbers("cluster1", "kv-test2"); - System.out.println("count: " + count); - } - -} diff --git a/src/test/java/org/smartloli/kafka/eagle/factory/TestKafkaMetricsImpl.java b/src/test/java/org/smartloli/kafka/eagle/factory/TestKafkaMetricsImpl.java deleted file mode 100644 index 8fcd974..0000000 --- a/src/test/java/org/smartloli/kafka/eagle/factory/TestKafkaMetricsImpl.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.smartloli.kafka.eagle.factory; - -import org.apache.kafka.clients.admin.ConfigEntry; -import org.smartloli.kafka.eagle.web.service.KafkaMetricsService; -import org.smartloli.kafka.eagle.web.service.impl.KafkaMetricsServiceImpl; - -/** - * TODO - * - * @author smartloli. - * - * Created by Jun 9, 2019 - */ -public class TestKafkaMetricsImpl { - - private static final KafkaMetricsService kafkaMetric = new KafkaMetricsServiceImpl(); - - public static void main(String[] args) { - ConfigEntry configEntry = new ConfigEntry("cleanup.policy", "122ss"); - String target = kafkaMetric.changeTopicConfig("cluster1", "kv-test2019", "ADD", configEntry); - System.out.println("target: " + target); - } -} diff --git a/src/test/java/org/smartloli/kafka/eagle/factory/TestKafkaProvider.java b/src/test/java/org/smartloli/kafka/eagle/factory/TestKafkaProvider.java deleted file mode 100644 index 4f83021..0000000 --- a/src/test/java/org/smartloli/kafka/eagle/factory/TestKafkaProvider.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.smartloli.kafka.eagle.factory; - -import kafka.admin.TopicCommand; - -/** - * Test Provider clazz. - * - * @author smartloli. - * - * Created by Jan 17, 2017 - */ -public class TestKafkaProvider { - public static void main(String[] args) { - String[] options = new String[] { "--alter", "--zookeeper", "slave01:2181","--partitions","6", "--topic", "KE_TTT_1200" }; - TopicCommand.main(options); - } - -} diff --git a/src/test/java/org/smartloli/kafka/eagle/factory/TestKafkaServiceImpl.java b/src/test/java/org/smartloli/kafka/eagle/factory/TestKafkaServiceImpl.java deleted file mode 100644 index 940c9cd..0000000 --- a/src/test/java/org/smartloli/kafka/eagle/factory/TestKafkaServiceImpl.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * - */ -package org.smartloli.kafka.eagle.factory; - -import kafka.zk.KafkaZkClient; -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.smartloli.kafka.eagle.web.constant.KafkaConstants; -import org.smartloli.kafka.eagle.web.service.KafkaService; -import org.smartloli.kafka.eagle.web.service.ZkService; -import org.smartloli.kafka.eagle.web.service.impl.KafkaServiceImpl; -import org.smartloli.kafka.eagle.web.service.impl.ZkServiceImpl; -import org.smartloli.kafka.eagle.web.util.KafkaResourcePoolUtils; -import scala.collection.JavaConversions; -import scala.collection.Seq; - -import java.util.*; - -/** - * TODO - * - * @author smartloli. - * - * Created by Mar 24, 2017 - */ -public class TestKafkaServiceImpl { - - private final String BROKER_TOPICS_PATH = "/brokers/topics"; - - private static final KafkaService kafkaService = new KafkaServiceImpl(); - - private static final ZkService zkService = new ZkServiceImpl(); - - public static void main(String[] args) { - - long logsize = kafkaService.getKafkaLogSize("cluster1", "kafka20191217", 0); - System.out.println(logsize); - - String res = kafkaService.getKafkaOffset("cluster1"); - System.out.println(res); - - Set partitionids = new HashSet<>(); - for (int i = 0; i < 10; i++) { - partitionids.add(i); - } - Map offsets = kafkaService.getKafkaOffset("cluster1", "kafka_app0", "test_16", partitionids); - System.out.println("offsets: " + offsets); - } - - public Map getKafkaLogSize(String topic, Set partitionIds) { - Properties props = new Properties(); - props.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConstants.KAFKA_EAGLE_SYSTEM_GROUP); - props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName()); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName()); - KafkaConsumer consumer = new KafkaConsumer<>(props); - Map topicPartitionLongMap = new HashMap<>(partitionIds.size()); - for (int partitionId : partitionIds) { - TopicPartition tp = new TopicPartition(topic, partitionId); - long offset = consumer.position(tp); - topicPartitionLongMap.put(tp, offset); - } - consumer.close(); - return topicPartitionLongMap; - } - - public List findTopicPartition(String clusterAlias, String topic) { - KafkaZkClient zkc = KafkaResourcePoolUtils.getZookeeperClient(clusterAlias); - Seq brokerTopicsPaths = zkc.getChildren(BROKER_TOPICS_PATH + "/" + topic + "/partitions"); - List topicAndPartitions = JavaConversions.seqAsJavaList(brokerTopicsPaths); - KafkaResourcePoolUtils.release(clusterAlias, zkc); - return topicAndPartitions; - } - -} diff --git a/src/test/java/org/smartloli/kafka/eagle/factory/TestMx4jProvider.java b/src/test/java/org/smartloli/kafka/eagle/factory/TestMx4jProvider.java deleted file mode 100644 index 30ef6c4..0000000 --- a/src/test/java/org/smartloli/kafka/eagle/factory/TestMx4jProvider.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * - */ -package org.smartloli.kafka.eagle.factory; - -import org.smartloli.kafka.eagle.web.service.Mx4jService; -import org.smartloli.kafka.eagle.web.service.impl.Mx4jServiceImpl; - -/** - * Test Provider clazz. - * - * @author smartloli. - * - * Created by Jul 17, 2017 - */ -public class TestMx4jProvider { - - public static void main(String[] args) { - Mx4jService mx4j = new Mx4jServiceImpl(); - System.out.println(mx4j.bytesInPerSec("slave01:9999")); - } - -} diff --git a/src/test/java/org/smartloli/kafka/eagle/factory/TestZkProvider.java b/src/test/java/org/smartloli/kafka/eagle/factory/TestZkProvider.java deleted file mode 100644 index 97fe281..0000000 --- a/src/test/java/org/smartloli/kafka/eagle/factory/TestZkProvider.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.smartloli.kafka.eagle.factory; - -import org.smartloli.kafka.eagle.web.service.ZkService; -import org.smartloli.kafka.eagle.web.service.impl.ZkServiceImpl; - -/** - * Test Provider clazz. - * - * @author smartloli. - * - * Created by Jan 17, 2017 - */ -public class TestZkProvider { - public static void main(String[] args) { - ZkService zkService = new ZkServiceImpl(); - System.out.println(zkService.zkCluster("cluster1")); - } -} diff --git a/src/test/java/org/smartloli/kafka/eagle/factory/TestZkTopic.java b/src/test/java/org/smartloli/kafka/eagle/factory/TestZkTopic.java deleted file mode 100644 index a8ba16c..0000000 --- a/src/test/java/org/smartloli/kafka/eagle/factory/TestZkTopic.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.smartloli.kafka.eagle.factory; - -import java.util.ArrayList; -import java.util.List; - -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.TypeReference; - -/** - * Test clazz. - * - * @author smartloli. - * - * Created by Jun 1, 2018 - */ -public class TestZkTopic { - public static void main(String[] args) { - String isr = "[0,1,2]"; - List isrInts = JSON.parseObject(isr, new TypeReference>(){}); - System.out.println(isrInts.toString()); - } - -} diff --git a/src/test/java/org/smartloli/kafka/eagle/service/TestClusterService.java b/src/test/java/org/smartloli/kafka/eagle/service/ClusterServiceTest.java similarity index 76% rename from src/test/java/org/smartloli/kafka/eagle/service/TestClusterService.java rename to src/test/java/org/smartloli/kafka/eagle/service/ClusterServiceTest.java index 8fac7fb..9c915f2 100644 --- a/src/test/java/org/smartloli/kafka/eagle/service/TestClusterService.java +++ b/src/test/java/org/smartloli/kafka/eagle/service/ClusterServiceTest.java @@ -17,7 +17,12 @@ */ package org.smartloli.kafka.eagle.service; +import lombok.extern.slf4j.Slf4j; +import org.junit.runner.RunWith; +import org.smartloli.kafka.eagle.web.KafkaEagleBootstrap; import org.smartloli.kafka.eagle.web.service.impl.ClusterServiceImpl; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; /** * Test ClusterService interface. @@ -26,7 +31,10 @@ import org.smartloli.kafka.eagle.web.service.impl.ClusterServiceImpl; * * Created by Jan 16, 2017 */ -public class TestClusterService { +@RunWith(SpringRunner.class) +@SpringBootTest(classes = KafkaEagleBootstrap.class) +@Slf4j +public class ClusterServiceTest { public static void main(String[] args) { ClusterServiceImpl clusterServiceImpl = new ClusterServiceImpl(); diff --git a/src/test/java/org/smartloli/kafka/eagle/service/TestDashboardService.java b/src/test/java/org/smartloli/kafka/eagle/service/DashboardServiceTest.java similarity index 73% rename from src/test/java/org/smartloli/kafka/eagle/service/TestDashboardService.java rename to src/test/java/org/smartloli/kafka/eagle/service/DashboardServiceTest.java index 6417dd0..84739de 100644 --- a/src/test/java/org/smartloli/kafka/eagle/service/TestDashboardService.java +++ b/src/test/java/org/smartloli/kafka/eagle/service/DashboardServiceTest.java @@ -17,6 +17,12 @@ */ package org.smartloli.kafka.eagle.service; +import lombok.extern.slf4j.Slf4j; +import org.junit.runner.RunWith; +import org.smartloli.kafka.eagle.web.KafkaEagleBootstrap; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + /** * Test DashboardService clazz. * @@ -24,7 +30,10 @@ package org.smartloli.kafka.eagle.service; * * Created by Jan 16, 2017 */ -public class TestDashboardService { +@RunWith(SpringRunner.class) +@SpringBootTest(classes = KafkaEagleBootstrap.class) +@Slf4j +public class DashboardServiceTest { public static void main(String[] args) { System.out.println(); diff --git a/src/test/java/org/smartloli/kafka/eagle/service/TestAlarmService.java b/src/test/java/org/smartloli/kafka/eagle/service/TestAlarmService.java deleted file mode 100644 index a4bae6f..0000000 --- a/src/test/java/org/smartloli/kafka/eagle/service/TestAlarmService.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.smartloli.kafka.eagle.service; - -/** - * Test TestAlarmService interface. - * - * @author smartloli. - * - * Created by Jan 16, 2017 - */ -public class TestAlarmService { - - public static void main(String[] args) { - System.out.println(); - } - -} diff --git a/src/test/java/org/smartloli/kafka/eagle/service/TestConsumerService.java b/src/test/java/org/smartloli/kafka/eagle/service/TestConsumerService.java deleted file mode 100644 index 171a572..0000000 --- a/src/test/java/org/smartloli/kafka/eagle/service/TestConsumerService.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.smartloli.kafka.eagle.service; - -/** -* Test ConsumerService clazz -* -* @author smartloli. -* -* Created by Jan 16, 2017 -*/ -public class TestConsumerService { - - public static void main(String[] args) { - System.out.println(); - } - -} diff --git a/src/test/java/org/smartloli/kafka/eagle/service/TestTopicService.java b/src/test/java/org/smartloli/kafka/eagle/service/TestTopicService.java deleted file mode 100644 index c82fc6f..0000000 --- a/src/test/java/org/smartloli/kafka/eagle/service/TestTopicService.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.smartloli.kafka.eagle.service; - -/** -* Test TopicService clazz. -* -* @author smartloli. -* -* Created by Jan 16, 2017 -*/ -public class TestTopicService { - - public static void main(String[] args) { - System.out.println(); - } - -} diff --git a/src/test/java/org/smartloli/kafka/eagle/ipc/TestKSql.java b/src/test/java/org/smartloli/kafka/eagle/sql/KSqlTest.java similarity index 97% rename from src/test/java/org/smartloli/kafka/eagle/ipc/TestKSql.java rename to src/test/java/org/smartloli/kafka/eagle/sql/KSqlTest.java index 29d4dc3..9973ddc 100644 --- a/src/test/java/org/smartloli/kafka/eagle/ipc/TestKSql.java +++ b/src/test/java/org/smartloli/kafka/eagle/sql/KSqlTest.java @@ -16,7 +16,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.smartloli.kafka.eagle.ipc; +package org.smartloli.kafka.eagle.sql; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; @@ -33,7 +33,7 @@ import java.util.List; * * Created by Feb 27, 2018 */ -public class TestKSql { +public class KSqlTest { public static void main(String[] args) throws Exception { // ignite(); diff --git a/src/test/java/org/smartloli/kafka/eagle/sql/TestKafkaParser.java b/src/test/java/org/smartloli/kafka/eagle/sql/KafkaParserTest.java similarity index 63% rename from src/test/java/org/smartloli/kafka/eagle/sql/TestKafkaParser.java rename to src/test/java/org/smartloli/kafka/eagle/sql/KafkaParserTest.java index ab2563d..6ca303f 100644 --- a/src/test/java/org/smartloli/kafka/eagle/sql/TestKafkaParser.java +++ b/src/test/java/org/smartloli/kafka/eagle/sql/KafkaParserTest.java @@ -17,24 +17,38 @@ */ package org.smartloli.kafka.eagle.sql; +import lombok.extern.slf4j.Slf4j; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.smartloli.kafka.eagle.web.KafkaEagleBootstrap; import org.smartloli.kafka.eagle.web.sql.execute.KafkaSqlParser; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; /** * Test kafka sql query. - * * @author smartloli. - * - * Created by Feb 28, 2017 + * Created by Feb 28, 2017 */ -public class TestKafkaParser { +@RunWith(SpringRunner.class) +@SpringBootTest(classes = KafkaEagleBootstrap.class) +@Slf4j +public class KafkaParserTest { + + @Autowired + private KafkaSqlParser kafkaSqlParser; - public static void main(String[] args) { + /** + * KQL 语法单元测试 + */ + @Test + public void executeTest() { // String sql = "SELECT \"partition\", \"offset\",\"msg\" from // \"kv-test2019\" where \"partition\" in (0) and \"offset\"=37445 group // by \"partition\" limit 10"; String sql = "select * from \"kv-test2019\" where \"partition\" in (0) limit 10"; - String result = new KafkaSqlParser().execute("cluster1", sql); - System.out.println("result: " + result); + String result = kafkaSqlParser.execute("cluster1", sql); + log.info("result: {}", result); } - } diff --git a/src/test/java/org/smartloli/kafka/eagle/util/TestHttpClientUtils.java b/src/test/java/org/smartloli/kafka/eagle/util/HttpClientUtilsTest.java similarity index 90% rename from src/test/java/org/smartloli/kafka/eagle/util/TestHttpClientUtils.java rename to src/test/java/org/smartloli/kafka/eagle/util/HttpClientUtilsTest.java index 75d9699..20c827e 100644 --- a/src/test/java/org/smartloli/kafka/eagle/util/TestHttpClientUtils.java +++ b/src/test/java/org/smartloli/kafka/eagle/util/HttpClientUtilsTest.java @@ -18,14 +18,13 @@ package org.smartloli.kafka.eagle.util; import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import java.io.IOException; @@ -40,8 +39,8 @@ import java.util.Map; *

* Created by Jan 1, 2019 */ -public class TestHttpClientUtils { - private static final Logger LOG = LoggerFactory.getLogger(TestHttpClientUtils.class); +@Slf4j +public class HttpClientUtilsTest { @Value("kafka.eagle.im.dingding.url") private String dingdingUrl; @@ -67,32 +66,28 @@ public class TestHttpClientUtils { Map dingDingMarkdownMessage = getDingDingMarkdownMessage(title, text, mobiles); HttpPost httpPost = createHttpPost(dingDingMarkdownMessage); if (httpPost == null) { - return; } - LOG.info("send mark down message to ding ding. title:{}, mobiles:{}, text:{}", title, mobiles.toString(), text); + log.info("send mark down message to ding ding. title:{}, mobiles:{}, text:{}", title, mobiles.toString(), text); executeAndGetResponse(httpPost); } /** * send group. - * - * @param string */ public void sendMarkdownToDingDing(String title, String text) { Map dingDingMarkdownMessage = getDingDingMarkdownMessage(title, text, true); HttpPost httpPost = createHttpPost(dingDingMarkdownMessage); if (httpPost == null) { - LOG.error("|resp error|title:{} text:{}|ding ding robot token is null. ", title, text); + log.error("|resp error|title:{} text:{}|ding ding robot token is null. ", title, text); return; } - LOG.info("send mark down message to ding ding. title:{}, text:{}", title, text); + log.info("send mark down message to ding ding. title:{}, text:{}", title, text); executeAndGetResponse(httpPost); } /** * send & get response result. - * * @param httpPost */ private static void executeAndGetResponse(HttpPost httpPost) { @@ -101,10 +96,10 @@ public class TestHttpClientUtils { try { response = httpClient.execute(httpPost); String result = EntityUtils.toString(response.getEntity(), "utf-8"); - LOG.info("dingding server result:" + result); + log.info("dingding server result:" + result); httpClient.close(); } catch (Exception e) { - LOG.error(e.getMessage(), e); + log.error(e.getMessage(), e); } finally { if (httpClient != null) { try { @@ -135,7 +130,6 @@ public class TestHttpClientUtils { Map at = new HashMap<>(); at.put("isAtAll", false); map.put("at", at); - return map; } @@ -159,7 +153,6 @@ public class TestHttpClientUtils { at.put("atMobiles", atMobiles); at.put("isAtAll", false); map.put("at", at); - return map; } } diff --git a/src/test/java/org/smartloli/kafka/eagle/util/TestQuartzManagerUtils.java b/src/test/java/org/smartloli/kafka/eagle/util/QuartzManagerUtilsTest.java similarity index 74% rename from src/test/java/org/smartloli/kafka/eagle/util/TestQuartzManagerUtils.java rename to src/test/java/org/smartloli/kafka/eagle/util/QuartzManagerUtilsTest.java index a576d29..76a4576 100644 --- a/src/test/java/org/smartloli/kafka/eagle/util/TestQuartzManagerUtils.java +++ b/src/test/java/org/smartloli/kafka/eagle/util/QuartzManagerUtilsTest.java @@ -17,6 +17,8 @@ */ package org.smartloli.kafka.eagle.util; +import lombok.extern.slf4j.Slf4j; +import org.smartloli.kafka.eagle.web.job.im.DingDingJob; import org.smartloli.kafka.eagle.web.protocol.alarm.queue.BaseJobContext; import org.smartloli.kafka.eagle.web.util.QuartzManagerUtils; @@ -29,19 +31,18 @@ import java.util.Date; * * Created by Oct 25, 2019 */ -public class TestQuartzManagerUtils { +@Slf4j +public class QuartzManagerUtilsTest { public static void main(String[] args) { - System.out.println("Send msg, date : [" + new Date().toString() + "]"); + log.info("Send msg, date : [" + new Date().toString() + "]"); String jobName = "ke_job_id_" + new Date().getTime(); String jobName2 = "ke_job2_id_" + new Date().getTime(); BaseJobContext bjc = new BaseJobContext(); bjc.setData("test"); bjc.setUrl("http://www.kafka-eagle.org"); - QuartzManagerUtils.addJob(bjc, jobName, TestJob.class, QuartzManagerUtils.getCron(new Date(), 5)); - QuartzManagerUtils.addJob(bjc, jobName2, TestJob.class, QuartzManagerUtils.getCron(new Date(), 10)); - // QuartzManagerUtils.addJob("ke_job_id_" + new Date().getTime(), - // TestJob.class, QuartzManagerUtils.getCron(new Date(), 10)); + QuartzManagerUtils.addJob(bjc, jobName, DingDingJob.class, QuartzManagerUtils.getCron(new Date(), 5)); + QuartzManagerUtils.addJob(bjc, jobName2, DingDingJob.class, QuartzManagerUtils.getCron(new Date(), 10)); } } diff --git a/src/test/java/org/smartloli/kafka/eagle/util/TestJob.java b/src/test/java/org/smartloli/kafka/eagle/util/TestJob.java deleted file mode 100644 index 03b7464..0000000 --- a/src/test/java/org/smartloli/kafka/eagle/util/TestJob.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.smartloli.kafka.eagle.util; - -import org.quartz.Job; -import org.quartz.JobExecutionContext; -import org.quartz.JobExecutionException; -import org.smartloli.kafka.eagle.web.constant.KafkaConstants; -import org.smartloli.kafka.eagle.web.protocol.alarm.queue.BaseJobContext; - -import java.util.Date; - -/** - * TODO - * - * @author smartloli. - * - * Created by Oct 25, 2019 - */ -public class TestJob implements Job { - @Override - public void execute(JobExecutionContext arg0) throws JobExecutionException { - BaseJobContext bjc = (BaseJobContext) arg0.getJobDetail().getJobDataMap().get(KafkaConstants.JOB_PARAMS); - System.out.println("kafka eagle quartz job, date : [" + new Date().toString() + "],[" + bjc + "]"); - } -} diff --git a/src/test/java/org/smartloli/kafka/eagle/util/TestNetUtils.java b/src/test/java/org/smartloli/kafka/eagle/util/TestNetUtils.java deleted file mode 100644 index 30f6686..0000000 --- a/src/test/java/org/smartloli/kafka/eagle/util/TestNetUtils.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.smartloli.kafka.eagle.util; - -import org.smartloli.kafka.eagle.web.util.DateUtils; - -/** - * TODO - * - * @author smartloli. - *

- * Created by Apr 18, 2018 - */ -public class TestNetUtils { - - public static void main(String[] args) { - // System.out.println(NetUtils.telnet("dn1", 9092)); - // System.out.println(NetUtils.ping("nna")); - System.out.println(DateUtils.convertUnixTime(1524249300080L)); - System.out.println(DateUtils.convertUnixTime(1524249900097L)); - } - -} diff --git a/src/test/java/org/smartloli/kafka/eagle/util/TestZKUtils.java b/src/test/java/org/smartloli/kafka/eagle/util/TestZKUtils.java deleted file mode 100644 index ae8ed3c..0000000 --- a/src/test/java/org/smartloli/kafka/eagle/util/TestZKUtils.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.smartloli.kafka.eagle.util; - -import kafka.zk.KafkaZkClient; -import org.smartloli.kafka.eagle.web.util.KafkaResourcePoolUtils; - -/** - * TODO - * - * @author smartloli. - * - * Created by Oct 28, 2018 - */ -public class TestZKUtils { - - public static void main(String[] args) { - KafkaZkClient zkCli = KafkaResourcePoolUtils.getZookeeperClient("cluster2"); - KafkaResourcePoolUtils.release("cluster2", zkCli); - System.out.println(zkCli); - } - -} -- Gitee From 5a2fab0ed136104c32e028773f8c4747b434461c Mon Sep 17 00:00:00 2001 From: zhiwei_yang Date: Fri, 15 May 2020 18:18:55 +0800 Subject: [PATCH 02/28] =?UTF-8?q?=E5=88=A0=E9=99=A4=E5=A4=9A=E4=BD=99?= =?UTF-8?q?=E7=9A=84=E5=8D=95=E5=85=83=E6=B5=8B=E8=AF=95=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kafka/eagle/web/job/KafkaMetricsJob.java | 2 +- .../eagle/web/job/KafkaTopicMetricsJob.java | 2 +- .../kafka/eagle/web/service/KafkaService.java | 22 ++- .../web/service/impl/ConsumerServiceImpl.java | 6 +- .../service/impl/KafkaMetricsServiceImpl.java | 7 +- .../web/service/impl/KafkaServiceImpl.java | 139 +++++++++--------- .../web/util/KafkaResourcePoolUtils.java | 6 +- .../kafka/eagle/service/KafkaServiceTest.java | 37 +++++ 8 files changed, 131 insertions(+), 90 deletions(-) diff --git a/src/main/java/org/smartloli/kafka/eagle/web/job/KafkaMetricsJob.java b/src/main/java/org/smartloli/kafka/eagle/web/job/KafkaMetricsJob.java index 80654d3..85acc81 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/job/KafkaMetricsJob.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/job/KafkaMetricsJob.java @@ -79,7 +79,7 @@ public class KafkaMetricsJob { @Scheduled(cron = "0 0/1 * * * ?") protected void execute() { if (kafkaEagleMetricsCharts) { - Map> kafkaBrokerInfoMap = kafkaService.getAllBrokerInfos(kafkaClustersConfig.getClusterAllAlias()); + Map> kafkaBrokerInfoMap = kafkaService.getBrokerInfos(kafkaClustersConfig.getClusterAllAlias()); for (SingleClusterConfig singleClusterConfig : kafkaClustersConfig.getClusters()) { List kafkaBrokerInfoList = kafkaBrokerInfoMap.get(singleClusterConfig.getAlias()); kafkaCluster(singleClusterConfig.getAlias(), kafkaBrokerInfoList); //kafka集群数据采集 diff --git a/src/main/java/org/smartloli/kafka/eagle/web/job/KafkaTopicMetricsJob.java b/src/main/java/org/smartloli/kafka/eagle/web/job/KafkaTopicMetricsJob.java index aea7226..47e72d1 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/job/KafkaTopicMetricsJob.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/job/KafkaTopicMetricsJob.java @@ -141,7 +141,7 @@ public class KafkaTopicMetricsJob { Map> consumerGroups = kafkaService.getConsumers(singleClusterConfig.getAlias()); for (Entry> entry : consumerGroups.entrySet()) { String group = entry.getKey(); - for (String topic : kafkaService.getActiveTopic(singleClusterConfig.getAlias(), group)) { + for (String topic : kafkaService.findActiveTopics(singleClusterConfig.getAlias(), group)) { BScreenConsumerInfo bscreenConsumer = new BScreenConsumerInfo(); bscreenConsumer.setCluster(singleClusterConfig.getAlias()); bscreenConsumer.setGroup(group); diff --git a/src/main/java/org/smartloli/kafka/eagle/web/service/KafkaService.java b/src/main/java/org/smartloli/kafka/eagle/web/service/KafkaService.java index 52680b1..00602ea 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/service/KafkaService.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/service/KafkaService.java @@ -39,35 +39,41 @@ public interface KafkaService { boolean findTopicAndGroupExist(String clusterAlias, String topic, String group); /** - * Obtaining metadata in zookeeper by topic. + * 获取主题分区信息 + * @param clusterAlias 集群名 + * @param topic 主题名称 + * @return List. */ List findTopicPartition(String clusterAlias, String topic); /** - * Get kafka active consumer topic. + * 获取活跃kafka消费者主题信息 + * @param clusterName kafka系群名 */ - Map> getActiveTopic(String clusterAlias); + Map> findActiveTopics(String clusterName); /** - * Get kafka active consumer topic. + * 获取kafka集群消费组订阅的活跃主题信息 + * @param clusterAlias kafka集群名称 + * @param consumerGroup 消费组 + * @return */ - Set getActiveTopic(String clusterAlias, String group); + Set findActiveTopics(String clusterAlias, String consumerGroup); /** * 通过别名获取Kafka 代理节点信息 * - * @param clusterName kafka集群列表 + * @param clusterName kafka集群名称 * @return */ List getBrokerInfos(String clusterName); /** * 通过别名获取Kafka 代理节点信息 - * * @param clusterNames kafka集群列表 * @return */ - Map> getAllBrokerInfos(List clusterNames); + Map> getBrokerInfos(List clusterNames); /** * Get broker host info from ids. diff --git a/src/main/java/org/smartloli/kafka/eagle/web/service/impl/ConsumerServiceImpl.java b/src/main/java/org/smartloli/kafka/eagle/web/service/impl/ConsumerServiceImpl.java index b68f270..a57478b 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/service/impl/ConsumerServiceImpl.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/service/impl/ConsumerServiceImpl.java @@ -83,7 +83,7 @@ public class ConsumerServiceImpl implements ConsumerService { * Get active graph from zookeeper. */ private String getActiveGraphDatasets(String clusterAlias) { - Map> activeTopics = kafkaService.getActiveTopic(clusterAlias); + Map> activeTopics = kafkaService.findActiveTopics(clusterAlias); JSONObject target = new JSONObject(); JSONArray targets = new JSONArray(); target.put("name", "Active Topics"); @@ -123,7 +123,7 @@ public class ConsumerServiceImpl implements ConsumerService { /** Get kafka active number & storage offset in zookeeper. */ private int getActiveNumber(String clusterAlias, String group, List topics) { - Map> activeTopics = kafkaService.getActiveTopic(clusterAlias); + Map> activeTopics = kafkaService.findActiveTopics(clusterAlias); int sum = 0; for (String topic : topics) { if (activeTopics.containsKey(group + "_" + topic)) { @@ -224,7 +224,7 @@ public class ConsumerServiceImpl implements ConsumerService { /** List the name of the topic in the consumer detail information. */ private String getConsumerDetail(String clusterAlias, String group) { Map> consumers = kafkaService.getConsumers(clusterAlias); - Map> actvTopics = kafkaService.getActiveTopic(clusterAlias); + Map> actvTopics = kafkaService.findActiveTopics(clusterAlias); List kafkaConsumerDetails = new ArrayList(); int id = 0; for (String topic : consumers.get(group)) { diff --git a/src/main/java/org/smartloli/kafka/eagle/web/service/impl/KafkaMetricsServiceImpl.java b/src/main/java/org/smartloli/kafka/eagle/web/service/impl/KafkaMetricsServiceImpl.java index 9d9f050..10fe2fd 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/service/impl/KafkaMetricsServiceImpl.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/service/impl/KafkaMetricsServiceImpl.java @@ -118,7 +118,7 @@ public class KafkaMetricsServiceImpl implements KafkaMetricsService { return StrUtils.stringifyByObject(sum); } - /** Get topic size from kafka jmx. */ + @Override public JSONObject topicSize(String clusterAlias, String topic) { String jmx = ""; JMXConnector connector = null; @@ -135,14 +135,13 @@ public class KafkaMetricsServiceImpl implements KafkaMetricsService { Object size = mbeanConnection.getAttribute(new ObjectName(objectName), KafkaLog.VALUE.getValue()); tpSize += Long.parseLong(size.toString()); } catch (Exception ex) { - log.error("Get topic size from jmx has error, msg is " + ex.getMessage()); - ex.printStackTrace(); + log.error("Get topic size from jmx has error", ex); } finally { if (connector != null) { try { connector.close(); } catch (IOException e) { - log.error("Close jmx connector has error, msg is " + e.getMessage()); + log.error("Close jmx connector has error", e); } } } diff --git a/src/main/java/org/smartloli/kafka/eagle/web/service/impl/KafkaServiceImpl.java b/src/main/java/org/smartloli/kafka/eagle/web/service/impl/KafkaServiceImpl.java index 49c8b4f..15133e1 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/service/impl/KafkaServiceImpl.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/service/impl/KafkaServiceImpl.java @@ -107,24 +107,28 @@ public class KafkaServiceImpl implements KafkaService { } /** - * Obtaining metadata in zookeeper by topic. - * @param topic Selected condition. + * 获取主题分区信息 + * @param clusterAlias 集群名 + * @param topic 主题名称 * @return List. */ @Override public List findTopicPartition(String clusterAlias, String topic) { KafkaZkClient kafkaZkClient = KafkaResourcePoolUtils.getZookeeperClient(clusterAlias); - Seq brokerTopicsPaths = kafkaZkClient.getChildren(BROKER_TOPICS_PATH + "/" + topic + "/partitions"); - List topicAndPartitions = JavaConversions.seqAsJavaList(brokerTopicsPaths); - KafkaResourcePoolUtils.release(clusterAlias, kafkaZkClient); - return topicAndPartitions; + try { + Seq brokerTopicsPaths = kafkaZkClient.getChildren(BROKER_TOPICS_PATH + "/" + topic + "/partitions"); + List topicAndPartitions = JavaConversions.seqAsJavaList(brokerTopicsPaths); + log.info("kafka集群:[{}], 主题:[{}], topicAndPartitions ==> {}", clusterAlias, topic, topicAndPartitions); + return topicAndPartitions; + }finally { + KafkaResourcePoolUtils.release(clusterAlias, kafkaZkClient); + } } - /** Get kafka active consumer topic. */ @Override - public Map> getActiveTopic(String clusterAlias) { - KafkaZkClient zkc = KafkaResourcePoolUtils.getZookeeperClient(clusterAlias); - Map> actvTopics = new HashMap>(); + public Map> findActiveTopics(String clusterName) { + KafkaZkClient zkc = KafkaResourcePoolUtils.getZookeeperClient(clusterName); + Map> activeTopics = new HashMap<>(); try { Seq subConsumerPaths = zkc.getChildren(CONSUMERS_PATH); List groups = JavaConversions.seqAsJavaList(subConsumerPaths); @@ -145,51 +149,47 @@ public class KafkaServiceImpl implements KafkaService { JSONObject groupAndTopic = (JSONObject) object; String group = groupAndTopic.getString("group"); String topic = groupAndTopic.getString("topic"); - if (actvTopics.containsKey(group + "_" + topic)) { - actvTopics.get(group + "_" + topic).add(topic); + if (activeTopics.containsKey(group + "_" + topic)) { + activeTopics.get(group + "_" + topic).add(topic); } else { List topics = new ArrayList(); topics.add(topic); - actvTopics.put(group + "_" + topic, topics); + activeTopics.put(group + "_" + topic, topics); } } } catch (Exception ex) { - log.error(ex.getMessage()); + log.error("查询kafka集群:[{}]活跃主题出错", clusterName, ex); } finally { - KafkaResourcePoolUtils.release(clusterAlias, zkc); + KafkaResourcePoolUtils.release(clusterName, zkc); } - return actvTopics; + log.info("查询kafka集群:[{}]活跃主题 ==> {}", clusterName, activeTopics); + return activeTopics; } - /** Get kafka active consumer topic. */ @Override - public Set getActiveTopic(String clusterAlias, String group) { + public Set findActiveTopics(String clusterAlias, String consumerGroup) { KafkaZkClient zkc = KafkaResourcePoolUtils.getZookeeperClient(clusterAlias); Set activeTopics = new HashSet<>(); try { - Seq topics = zkc.getChildren(CONSUMERS_PATH + "/" + group + OWNERS); + Seq topics = zkc.getChildren(CONSUMERS_PATH + "/" + consumerGroup + OWNERS); for (String topic : JavaConversions.seqAsJavaList(topics)) { activeTopics.add(topic); } } catch (Exception ex) { - log.error("Get kafka active topic has error, msg is " + ex.getMessage()); - log.error(ex.getMessage()); + log.error("Get kafka active topic has error", ex); } finally { - if (zkc != null) { - KafkaResourcePoolUtils.release(clusterAlias, zkc); - zkc = null; - } + KafkaResourcePoolUtils.release(clusterAlias, zkc); } return activeTopics; } @Override public List getBrokerInfos(String clusterName) { - return getAllBrokerInfos(Collections.singletonList(clusterName)).get(clusterName); + return getBrokerInfos(Collections.singletonList(clusterName)).get(clusterName); } @Override - public Map> getAllBrokerInfos(List clusterNames) { + public Map> getBrokerInfos(List clusterNames) { if (StringUtils.isEmpty(clusterNames)) { return Collections.emptyMap(); } @@ -990,35 +990,34 @@ public class KafkaServiceImpl implements KafkaService { return version; } - /** Get kafka 0.10.x sasl topic metadata. */ + @Override public List findKafkaLeader(String clusterAlias, String topic) { - List targets = new ArrayList<>(); + List metadataInfos = new ArrayList<>(); KafkaZkClient zkc = KafkaResourcePoolUtils.getZookeeperClient(clusterAlias); - if (zkc.pathExists(BROKER_TOPICS_PATH + "/" + topic)) { - Tuple2, Stat> tuple = zkc.getDataAndStat(BROKER_TOPICS_PATH + "/" + topic); - String tupleString = new String(tuple._1.get()); - JSONObject partitionObject = JSON.parseObject(tupleString).getJSONObject("partitions"); - for (String partition : partitionObject.keySet()) { - String path = String.format(TOPIC_ISR, topic, Integer.valueOf(partition)); - Tuple2, Stat> tuple2 = zkc.getDataAndStat(path); - String tupleString2 = new String(tuple2._1.get()); - JSONObject topicMetadata = JSON.parseObject(tupleString2); - MetadataInfo metadate = new MetadataInfo(); - try { - metadate.setLeader(topicMetadata.getInteger("leader")); - } catch (Exception e) { - log.error("Parse string brokerid to int has error, brokerid[" + topicMetadata.getString("leader") + "]"); - e.printStackTrace(); + try { + if (zkc.pathExists(BROKER_TOPICS_PATH + "/" + topic)) { + Tuple2, Stat> tuple = zkc.getDataAndStat(BROKER_TOPICS_PATH + "/" + topic); + String tupleString = new String(tuple._1.get()); + JSONObject partitionObject = JSON.parseObject(tupleString).getJSONObject("partitions"); + for (String partition : partitionObject.keySet()) { + String path = String.format(TOPIC_ISR, topic, Integer.valueOf(partition)); + Tuple2, Stat> tuple2 = zkc.getDataAndStat(path); + String tupleString2 = new String(tuple2._1.get()); + JSONObject topicMetadata = JSON.parseObject(tupleString2); + MetadataInfo metadata = new MetadataInfo(); + try { + metadata.setLeader(topicMetadata.getInteger("leader")); + } catch (Exception e) { + log.error("Parse string brokerid to int has error, brokerid[" + topicMetadata.getString("leader") + "]", e); + } + metadata.setPartitionId(Integer.parseInt(partition)); + metadataInfos.add(metadata); } - metadate.setPartitionId(Integer.parseInt(partition)); - targets.add(metadate); - } - } - if (zkc != null) { + } + }finally { KafkaResourcePoolUtils.release(clusterAlias, zkc); - zkc = null; } - return targets; + return metadataInfos; } /** Send mock message to kafka topic . */ @@ -1189,31 +1188,31 @@ public class KafkaServiceImpl implements KafkaService { } /** Get broker host and jmx_port info from ids. */ + @Override public String getBrokerJMXFromIds(String clusterAlias, int ids) { String jni = ""; KafkaZkClient zkc = KafkaResourcePoolUtils.getZookeeperClient(clusterAlias); - if (zkc.pathExists(BROKER_IDS_PATH)) { - try { - Tuple2, Stat> tuple = zkc.getDataAndStat(BROKER_IDS_PATH + "/" + ids); - String tupleString = new String(tuple._1.get()); - String host = ""; - if (kafkaClustersConfig.getClusterConfigByName(clusterAlias).getSasl().getEnable()) { - String endpoints = JSON.parseObject(tupleString).getString("endpoints"); - String tmp = endpoints.split("//")[1]; - host = tmp.substring(0, tmp.length() - 2).split(":")[0]; - } else { - host = JSON.parseObject(tupleString).getString("host"); + try { + if (zkc.pathExists(BROKER_IDS_PATH)) { + try { + Tuple2, Stat> tuple = zkc.getDataAndStat(BROKER_IDS_PATH + "/" + ids); + String tupleString = new String(tuple._1.get()); + String host = ""; + if (kafkaClustersConfig.getClusterConfigByName(clusterAlias).getSasl().getEnable()) { + String endpoints = JSON.parseObject(tupleString).getString("endpoints"); + String tmp = endpoints.split("//")[1]; + host = tmp.substring(0, tmp.length() - 2).split(":")[0]; + } else { + host = JSON.parseObject(tupleString).getString("host"); + } + int jmxPort = JSON.parseObject(tupleString).getInteger("jmx_port"); + jni = host + ":" + jmxPort; + } catch (Exception ex) { + log.error("Get broker from ids has error", ex); } - int jmxPort = JSON.parseObject(tupleString).getInteger("jmx_port"); - jni = host + ":" + jmxPort; - } catch (Exception ex) { - log.error("Get broker from ids has error, msg is " + ex.getCause().getMessage()); - ex.printStackTrace(); } - } - if (zkc != null) { + }finally { KafkaResourcePoolUtils.release(clusterAlias, zkc); - zkc = null; } return jni; } diff --git a/src/main/java/org/smartloli/kafka/eagle/web/util/KafkaResourcePoolUtils.java b/src/main/java/org/smartloli/kafka/eagle/web/util/KafkaResourcePoolUtils.java index 5968ace..ac18ea5 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/util/KafkaResourcePoolUtils.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/util/KafkaResourcePoolUtils.java @@ -225,7 +225,7 @@ public final class KafkaResourcePoolUtils implements InitializingBean { if (CollectionUtils.isEmpty(kafkaClustersConfig.getClusters())) { throw new RuntimeException("Kafka集群配置为空,项目无法启动"); } - Map> clusterBrokerInfoMap = kafkaService.getAllBrokerInfos(kafkaClustersConfig.getClusterAllAlias()); + Map> clusterBrokerInfoMap = kafkaService.getBrokerInfos(kafkaClustersConfig.getClusterAllAlias()); log.info("项目启动初始配置kafka集群Broker节点信息:{}", JSON.toJSONString(clusterBrokerInfoMap)); for (SingleClusterConfig singleClusterConfig : kafkaClustersConfig.getClusters()) { @@ -272,7 +272,7 @@ public final class KafkaResourcePoolUtils implements InitializingBean { if (CollectionUtils.isEmpty(kafkaClustersConfig.getClusters())) { throw new RuntimeException("Kafka集群配置为空,项目无法启动"); } - Map> clusterBrokerInfoMap = kafkaService.getAllBrokerInfos(kafkaClustersConfig.getClusterAllAlias()); + Map> clusterBrokerInfoMap = kafkaService.getBrokerInfos(kafkaClustersConfig.getClusterAllAlias()); log.info("项目启动初始配置kafka集群Broker节点信息:{}", JSON.toJSONString(clusterBrokerInfoMap)); for (SingleClusterConfig singleClusterConfig : kafkaClustersConfig.getClusters()) { @@ -319,7 +319,7 @@ public final class KafkaResourcePoolUtils implements InitializingBean { if (CollectionUtils.isEmpty(kafkaClustersConfig.getClusters())) { throw new RuntimeException("Kafka集群配置为空,项目无法启动"); } - Map> clusterBrokerInfoMap = kafkaService.getAllBrokerInfos(kafkaClustersConfig.getClusterAllAlias()); + Map> clusterBrokerInfoMap = kafkaService.getBrokerInfos(kafkaClustersConfig.getClusterAllAlias()); log.info("项目启动初始配置kafka集群Broker节点信息:{}", JSON.toJSONString(clusterBrokerInfoMap)); for (SingleClusterConfig singleClusterConfig : kafkaClustersConfig.getClusters()) { diff --git a/src/test/java/org/smartloli/kafka/eagle/service/KafkaServiceTest.java b/src/test/java/org/smartloli/kafka/eagle/service/KafkaServiceTest.java index 2e49e33..e2c1e23 100644 --- a/src/test/java/org/smartloli/kafka/eagle/service/KafkaServiceTest.java +++ b/src/test/java/org/smartloli/kafka/eagle/service/KafkaServiceTest.java @@ -10,6 +10,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.smartloli.kafka.eagle.web.KafkaEagleBootstrap; import org.smartloli.kafka.eagle.web.constant.KafkaConstants; +import org.smartloli.kafka.eagle.web.protocol.MetadataInfo; import org.smartloli.kafka.eagle.web.service.KafkaService; import org.smartloli.kafka.eagle.web.util.KafkaResourcePoolUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -74,4 +75,40 @@ public class KafkaServiceTest { } KafkaResourcePoolUtils.release(cluster, adminClient); } + + /** + * 获取主题分区名 + */ + @Test + public void findTopicPartitionTest(){ + List partitions = kafkaService.findTopicPartition("cluster1", "MSG_OUTBOUND_APP3331"); + log.info("分区信息 ==> {}", partitions); + } + + /** + * 获取活跃主题信息 + */ + @Test + public void getActiveTopicTest(){ + Map> partitions = kafkaService.findActiveTopics("cluster1"); + log.info("活跃主题信息 ==> {}", partitions); + } + + /** + * 获取活跃主题信息 + */ + @Test + public void findActiveTopicsTest(){ + Set activeTopics = kafkaService.findActiveTopics("cluster1", KafkaConstants.KAFKA_EAGLE_SYSTEM_GROUP); + log.info("活跃主题信息 ==> {}", activeTopics); + } + + /** + * 获取主题Leader信息 + */ + @Test + public void findKafkaLeaderTest(){ + List metadataInfos = kafkaService.findKafkaLeader("cluster1", "MSG_OUTBOUND_APP3331"); + log.info("topic主题Leader消息 ==> {}", metadataInfos); + } } -- Gitee From 6a8ba9fcbea805b267010d5e30e8c28992f11875 Mon Sep 17 00:00:00 2001 From: zhiwei_yang Date: Sat, 16 May 2020 10:21:51 +0800 Subject: [PATCH 03/28] =?UTF-8?q?=E5=AE=8C=E5=96=84=E9=83=A8=E5=88=86?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3=E6=B3=A8=E9=87=8A=EF=BC=8C=E6=8F=90=E9=AB=98?= =?UTF-8?q?=E5=8F=AF=E8=AF=BB=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../eagle/web/controller/TopicController.java | 6 +- .../kafka/eagle/web/job/KafkaMetricsJob.java | 13 +- .../eagle/web/service/BrokerService.java | 14 +- .../eagle/web/service/ConsumerService.java | 1 - .../kafka/eagle/web/service/KafkaService.java | 62 +++-- .../eagle/web/service/MetricsService.java | 4 +- .../kafka/eagle/web/service/TopicService.java | 18 ++ .../web/service/impl/BrokerServiceImpl.java | 20 +- .../web/service/impl/ConsumerServiceImpl.java | 2 +- .../web/service/impl/KafkaServiceImpl.java | 224 +++++------------- .../web/service/impl/MetricsServiceImpl.java | 58 ++--- .../web/service/impl/Mx4jServiceImpl.java | 6 +- .../web/service/impl/OffsetServiceImpl.java | 4 +- .../web/service/impl/TopicServiceImpl.java | 56 ++++- src/main/resources/application.properties | 2 +- .../kafka/eagle/service/KafkaServiceTest.java | 9 + .../eagle/service/MetricsServiceTest.java | 35 +++ 17 files changed, 274 insertions(+), 260 deletions(-) create mode 100644 src/test/java/org/smartloli/kafka/eagle/service/MetricsServiceTest.java diff --git a/src/main/java/org/smartloli/kafka/eagle/web/controller/TopicController.java b/src/main/java/org/smartloli/kafka/eagle/web/controller/TopicController.java index 36c25cb..a6e874a 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/controller/TopicController.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/controller/TopicController.java @@ -464,7 +464,7 @@ public class TopicController { String ke_topic_partition = request.getParameter("ke_topic_partition"); String ke_topic_repli = request.getParameter("ke_topic_repli"); String clusterAlias = session.getAttribute(KafkaConstants.CLUSTER_ALIAS).toString(); - Map respons = kafkaService.create(clusterAlias, ke_topic_name, ke_topic_partition, ke_topic_repli); + Map respons = topicService.createTopic(clusterAlias, ke_topic_name, ke_topic_partition, ke_topic_repli); if ("success".equals(respons.get("status"))) { session.removeAttribute("Submit_Status"); session.setAttribute("Submit_Status", respons.get("info")); @@ -482,8 +482,8 @@ public class TopicController { public String topicDelete(@PathVariable("topicName") String topicName, @PathVariable("token") String token, HttpSession session) { if (kafkaEagleTopicToken.equals(token) && !KafkaConstants.CONSUMER_OFFSET_TOPIC.equals(topicName)) { String clusterAlias = session.getAttribute(KafkaConstants.CLUSTER_ALIAS).toString(); - Map respons = kafkaService.delete(clusterAlias, topicName); - if ("success".equals(respons.get("status"))) { + Map response = topicService.deleteTopic(clusterAlias, topicName); + if ("success".equals(response.get("status"))) { return "redirect:/topic/list"; } else { return "redirect:/500"; diff --git a/src/main/java/org/smartloli/kafka/eagle/web/job/KafkaMetricsJob.java b/src/main/java/org/smartloli/kafka/eagle/web/job/KafkaMetricsJob.java index 85acc81..707df94 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/job/KafkaMetricsJob.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/job/KafkaMetricsJob.java @@ -213,12 +213,10 @@ public class KafkaMetricsJob { /** * Kafka集群指标数据采集 - * * @param clusterAlias kafka集群名称 */ private void kafkaCluster(String clusterAlias, List brokers) { List list = new ArrayList<>(); - for (String kpi : broker_kpis) { KpiInfo kpiInfo = new KpiInfo(); kpiInfo.setCluster(clusterAlias); @@ -237,7 +235,6 @@ public class KafkaMetricsJob { /** * kafka指标数据装配 - * * @param type 指标数据类型 * @param kpiInfo 指标数据 * @param kafkaBrokerInfo kafka代理服务器信息 @@ -324,6 +321,10 @@ public class KafkaMetricsJob { } } + /** + * Kafka Zookeeper数据采集 + * @param clusterAlias kafka集群节点名称 + */ private void zkCluster(String clusterAlias) { List list = new ArrayList<>(); String zkList = kafkaClustersConfig.getClusterConfigByName(clusterAlias).getZkList(); @@ -334,19 +335,19 @@ public class KafkaMetricsJob { kpiInfo.setTm(DateUtils.getCustomDate("yyyyMMdd")); kpiInfo.setTimespan(DateUtils.getTimeSpan()); kpiInfo.setKey(kpi); - String broker = ""; + StringBuilder broker = new StringBuilder(); for (String zk : zks) { String ip = zk.split(":")[0]; String port = zk.split(":")[1]; if (port.contains("/")) { port = port.split("/")[0]; } - broker += ip + ","; + broker.append(ip).append(","); try { ZkClusterInfo zkInfo = ZKMetricsUtils.zkClusterMntrInfo(ip, Integer.parseInt(port)); zkAssembly(zkInfo, kpi, kpiInfo); } catch (Exception ex) { - log.error("Transcation string[" + port + "] to int has error, msg is ", ex); + log.error("Transcation string[" + port + "] to int has error", ex); } } kpiInfo.setBroker(broker.length() == 0 ? "unkowns" : broker.substring(0, broker.length() - 1)); diff --git a/src/main/java/org/smartloli/kafka/eagle/web/service/BrokerService.java b/src/main/java/org/smartloli/kafka/eagle/web/service/BrokerService.java index 8d05e73..8ac9f49 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/service/BrokerService.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/service/BrokerService.java @@ -63,10 +63,20 @@ public interface BrokerService { /** Get topic producer logsize total. */ long getTopicLogSizeTotal(String clusterAlias, String topic); - /** Get topic real logsize records. */ + /** + * 获取真实Kafka主题日志数据量 + * @param clusterAlias kafka集群名称 + * @param topic 主题名称 + * @return + */ long getTopicRealLogSize(String clusterAlias, String topic); - /** Get topic producer send logsize records. */ + /** + * 获取kafka集群生产者发送主题日志数量 + * @param clusterAlias kafka集群名称 + * @param topic 主题名称 + * @return + */ long getTopicProducerLogSize(String clusterAlias, String topic); /** Add topic partitions. */ diff --git a/src/main/java/org/smartloli/kafka/eagle/web/service/ConsumerService.java b/src/main/java/org/smartloli/kafka/eagle/web/service/ConsumerService.java index 3233198..874aa99 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/service/ConsumerService.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/service/ConsumerService.java @@ -71,7 +71,6 @@ public interface ConsumerService { */ int isConsumering(String clusterAlias, String group, String topic); - /** DB storage */ /** Get consumer group count by db. */ long getConsumerCountByDB(String clusterAlias); diff --git a/src/main/java/org/smartloli/kafka/eagle/web/service/KafkaService.java b/src/main/java/org/smartloli/kafka/eagle/web/service/KafkaService.java index 00602ea..b6156b9 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/service/KafkaService.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/service/KafkaService.java @@ -26,17 +26,20 @@ import java.util.Set; /** * Kafka group,topic and partition interface. - * * @author smartloli. - * - * Created by Jan 18, 2017 - * - * Update by hexiang 20170216 + * Created by Jan 18, 2017 + * Update by hexiang 20170216 */ public interface KafkaService { - /** Find topic and group exist in zookeeper. */ - boolean findTopicAndGroupExist(String clusterAlias, String topic, String group); + /** + * 查询消费组是否消费指定主题 + * @param clusterAlias kafka集群名称 + * @param topic 主题名称 + * @param consumerGroup 消费组名称 + * @return + */ + boolean findTopicExistInGroup(String clusterAlias, String topic, String consumerGroup); /** * 获取主题分区信息 @@ -81,17 +84,28 @@ public interface KafkaService { String getBrokerJMXFromIds(String clusterAlias, int ids); /** - * Obtaining kafka consumer information from zookeeper. + * 从Zookeeper获取消费者信息 + * @param clusterAlias kafka集群名称 */ Map> getConsumers(String clusterAlias); /** - * Obtaining kafka consumer page information from zookeeper. + * 从Zookeeper获取分页消费者信息 + * + * @param clusterAlias kafka集群名称 + * @param displayInfo 分页请求 */ - Map> getConsumers(String clusterAlias, DisplayInfo page); + Map> getConsumers(String clusterAlias, DisplayInfo displayInfo); - /** According to group, topic and partition to get offset from zookeeper. */ - OffsetZkInfo getOffset(String clusterAlias, String topic, String group, int partition); + /** + * 获取消费组监听主题的偏移量 + * @param clusterAlias kafka集群名称 + * @param topic 主题名称 + * @param group 消费组名 + * @param partition 分区号 + * @return + */ + OffsetZkInfo getGroupTopicPartitionOffset(String clusterAlias, String topic, String group, int partition); /** Get kafka 0.10.x offset from topic. */ String getKafkaOffset(String clusterAlias); @@ -99,12 +113,6 @@ public interface KafkaService { /** Get the data for the topic partition in the specified consumer group */ Map getKafkaOffset(String clusterAlias, String group, String topic, Set partitionids); - /** Use kafka console comand to create topic. */ - Map create(String clusterAlias, String topicName, String partitions, String replic); - - /** Use kafka console command to delete topic. */ - Map delete(String clusterAlias, String topicName); - /** * 解析集群代理服务器信息 * @param clusterAlias kafka集群名称 @@ -131,7 +139,7 @@ public interface KafkaService { /** Get kafka consumer information pages. */ String getKafkaActiverSize(String clusterAlias, String group); /** Get kafka consumer information pages not owners. */ - OwnerInfo getKafkaActiverNotOwners(String clusterAlias, String group); + OwnerInfo getKafkaActiveNotOwners(String clusterAlias, String group); /** Get kafka broker bootstrap server. */ String getKafkaBrokerServer(String clusterAlias); @@ -146,16 +154,16 @@ public interface KafkaService { long getKafkaLogSize(String clusterAlias, String topic, int partitionid); /** Get kafka topic history batch logsize. */ - Map getKafkaLogSize(String clusterAlias, String topic, Set partitionids); + Map getKafkaLogSize(String clusterAlias, String topic, Set partitionIds); /** Get kafka topic real logsize by partitionid. */ - long getKafkaRealLogSize(String clusterAlias, String topic, int partitionid); + long getKafkaRealLogSize(String clusterAlias, String topic, int partitionId); /** Get kafka topic real logsize by partitionid set. */ - long getKafkaRealLogSize(String clusterAlias, String topic, Set partitionids); + long getKafkaRealLogSize(String clusterAlias, String topic, Set partitionIds); /** Get topic producer send logsize records. */ - long getKafkaProducerLogSize(String clusterAlias, String topic, Set partitionids); + long getKafkaProducerLogSize(String clusterAlias, String topic, Set partitionIds); /** Get kafka sasl topic metadate. */ List findKafkaLeader(String clusterAlias, String topic); @@ -179,9 +187,13 @@ public interface KafkaService { long getRealLogSize(String clusterAlias, String topic, int partitionid); /** - * Get topic metadata. + * 获取主题分区的复制分区编号信息 + * @param clusterAlias kafka集群信息 + * @param topic 主题名称 + * @param partitionId 分区id + * @return */ - String getReplicasIsr(String clusterAlias, String topic, int partitionid); + String getTopicPartitionReplicas(String clusterAlias, String topic, int partitionId); /** * Get kafka version. diff --git a/src/main/java/org/smartloli/kafka/eagle/web/service/MetricsService.java b/src/main/java/org/smartloli/kafka/eagle/web/service/MetricsService.java index c643d4f..206d25b 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/service/MetricsService.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/service/MetricsService.java @@ -36,7 +36,9 @@ import java.util.Map; public interface MetricsService { /** - * Gets summary monitoring data for all broker. + * 获取Kafka集群所有代理节点监控数据 + * @param clusterAlias kafka集群名称 + * @return */ String getAllBrokersMBean(String clusterAlias); diff --git a/src/main/java/org/smartloli/kafka/eagle/web/service/TopicService.java b/src/main/java/org/smartloli/kafka/eagle/web/service/TopicService.java index 48f6f48..ec61d89 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/service/TopicService.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/service/TopicService.java @@ -34,6 +34,24 @@ import java.util.Map; */ public interface TopicService { + /** + * 创建主题 + * @param clusterAlias kafka集群名称 + * @param topicName 主题名称 + * @param partitions 分区数 + * @param replica 复制分区数 + * @return + */ + Map createTopic(String clusterAlias, String topicName, String partitions, String replica); + + /** + * 删除主题 + * @param clusterAlias kafka集群名称 + * @param topicName 主题名称 + * @return + */ + Map deleteTopic(String clusterAlias, String topicName); + /** * Find topic name in all topics. */ diff --git a/src/main/java/org/smartloli/kafka/eagle/web/service/impl/BrokerServiceImpl.java b/src/main/java/org/smartloli/kafka/eagle/web/service/impl/BrokerServiceImpl.java index 53c0bfa..300b7c4 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/service/impl/BrokerServiceImpl.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/service/impl/BrokerServiceImpl.java @@ -432,7 +432,7 @@ public class BrokerServiceImpl implements BrokerService { metadate.setIsr(topicMetadata.getString("isr")); metadate.setLeader(topicMetadata.getInteger("leader")); metadate.setPartitionId(partition); - metadate.setReplicas(kafkaService.getReplicasIsr(clusterAlias, topic, partition)); + metadate.setReplicas(kafkaService.getTopicPartitionReplicas(clusterAlias, topic, partition)); long logSize = 0L; if ("kafka".equals(kafkaClustersConfig.getClusterConfigByName(clusterAlias).getOffsetStorage())) { logSize = kafkaService.getKafkaRealLogSize(clusterAlias, topic, partition); @@ -493,7 +493,7 @@ public class BrokerServiceImpl implements BrokerService { metadate.setIsr(topicMetadata.getString("isr")); metadate.setLeader(topicMetadata.getInteger("leader")); metadate.setPartitionId(Integer.valueOf(partition)); - metadate.setReplicas(kafkaService.getReplicasIsr(clusterAlias, topic, Integer.valueOf(partition))); + metadate.setReplicas(kafkaService.getTopicPartitionReplicas(clusterAlias, topic, Integer.valueOf(partition))); targets.add(metadate); } } @@ -547,7 +547,6 @@ public class BrokerServiceImpl implements BrokerService { return logSize; } - /** Get topic real logsize records. */ @Override public long getTopicRealLogSize(String clusterAlias, String topic) { long logSize = 0L; @@ -589,10 +588,10 @@ public class BrokerServiceImpl implements BrokerService { if (KafkaConstants.CONSUMER_OFFSET_TOPIC.equals(topic)) { return logSize; } - KafkaZkClient zkc = KafkaResourcePoolUtils.getZookeeperClient(clusterAlias); + KafkaZkClient zookeeperClient = KafkaResourcePoolUtils.getZookeeperClient(clusterAlias); try { - if (zkc.pathExists(BROKER_TOPICS_PATH + "/" + topic)) { - Tuple2, Stat> tuple = zkc.getDataAndStat(BROKER_TOPICS_PATH + "/" + topic); + if (zookeeperClient.pathExists(BROKER_TOPICS_PATH + "/" + topic)) { + Tuple2, Stat> tuple = zookeeperClient.getDataAndStat(BROKER_TOPICS_PATH + "/" + topic); String tupleString = new String(tuple._1.get()); JSONObject partitionObject = JSON.parseObject(tupleString).getJSONObject("partitions"); Set partitions = new HashSet<>(); @@ -611,12 +610,9 @@ public class BrokerServiceImpl implements BrokerService { } } } catch (Exception e) { - log.error("Get topic real logsize has error, msg is " + e.getCause().getMessage()); - e.printStackTrace(); - } - if (zkc != null) { - KafkaResourcePoolUtils.release(clusterAlias, zkc); - zkc = null; + log.error("Get topic real logsize has error", e); + }finally { + KafkaResourcePoolUtils.release(clusterAlias, zookeeperClient); } return logSize; } diff --git a/src/main/java/org/smartloli/kafka/eagle/web/service/impl/ConsumerServiceImpl.java b/src/main/java/org/smartloli/kafka/eagle/web/service/impl/ConsumerServiceImpl.java index a57478b..4c4ca7f 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/service/impl/ConsumerServiceImpl.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/service/impl/ConsumerServiceImpl.java @@ -211,7 +211,7 @@ public class ConsumerServiceImpl implements ConsumerService { consumer.setGroup(group); consumer.setId(++id); consumer.setNode(consumerGroup.getString("node")); - OwnerInfo ownerInfo = kafkaService.getKafkaActiverNotOwners(clusterAlias, group); + OwnerInfo ownerInfo = kafkaService.getKafkaActiveNotOwners(clusterAlias, group); consumer.setTopics(ownerInfo.getTopicSets().size()); consumer.setActiveTopics(getKafkaActiveTopicNumbers(clusterAlias, group)); consumer.setActiveThreads(ownerInfo.getActiveSize()); diff --git a/src/main/java/org/smartloli/kafka/eagle/web/service/impl/KafkaServiceImpl.java b/src/main/java/org/smartloli/kafka/eagle/web/service/impl/KafkaServiceImpl.java index 15133e1..071c43f 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/service/impl/KafkaServiceImpl.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/service/impl/KafkaServiceImpl.java @@ -90,28 +90,17 @@ public class KafkaServiceImpl implements KafkaService { @Autowired private KafkaClustersConfig kafkaClustersConfig; - /** - * Find topic and group exist in zookeeper. - * - * @param topic Filter topic. - * @param group Filter group - * @return Boolean. - */ @Override - public boolean findTopicAndGroupExist(String clusterAlias, String topic, String group) { + public boolean findTopicExistInGroup(String clusterAlias, String topic, String consumerGroup) { KafkaZkClient zkc = KafkaResourcePoolUtils.getZookeeperClient(clusterAlias); - String ownersPath = CONSUMERS_PATH + "/" + group + "/owners/" + topic; - boolean status = zkc.pathExists(ownersPath); - KafkaResourcePoolUtils.release(clusterAlias, zkc); - return status; + try { + String ownersPath = CONSUMERS_PATH + "/" + consumerGroup + "/owners/" + topic; + return zkc.pathExists(ownersPath); + }finally { + KafkaResourcePoolUtils.release(clusterAlias, zkc); + } } - /** - * 获取主题分区信息 - * @param clusterAlias 集群名 - * @param topic 主题名称 - * @return List. - */ @Override public List findTopicPartition(String clusterAlias, String topic) { KafkaZkClient kafkaZkClient = KafkaResourcePoolUtils.getZookeeperClient(clusterAlias); @@ -234,11 +223,10 @@ public class KafkaServiceImpl implements KafkaService { return clusterBrokerInfoMap; } - /** Obtaining kafka consumer information from zookeeper. */ @Override public Map> getConsumers(String clusterAlias) { KafkaZkClient kafkaZkClient = KafkaResourcePoolUtils.getZookeeperClient(clusterAlias); - Map> consumers = new HashMap>(); + Map> consumers = new HashMap<>(); try { Seq subConsumerPaths = kafkaZkClient.getChildren(CONSUMERS_PATH); List groups = JavaConversions.seqAsJavaList(subConsumerPaths); @@ -252,26 +240,24 @@ public class KafkaServiceImpl implements KafkaService { log.error("Consumer Path[" + path + "] is not exist."); } } - } catch (Exception ex) { - log.error(ex.getMessage()); } finally { KafkaResourcePoolUtils.release(clusterAlias, kafkaZkClient); } + log.info("查询kafka集群[{}]消费者信息 ==> {}", clusterAlias, consumers); return consumers; } - /** Obtaining kafka consumer page information from zookeeper. */ @Override - public Map> getConsumers(String clusterAlias, DisplayInfo page) { + public Map> getConsumers(String clusterAlias, DisplayInfo displayInfo) { KafkaZkClient zkc = KafkaResourcePoolUtils.getZookeeperClient(clusterAlias); - Map> consumers = new HashMap>(); + Map> consumers = new HashMap<>(); try { - if (page.getSearch().length() > 0) { - String path = CONSUMERS_PATH + "/" + page.getSearch() + "/owners"; + if (displayInfo.getSearch().length() > 0) { + String path = CONSUMERS_PATH + "/" + displayInfo.getSearch() + "/owners"; if (zkc.pathExists(path)) { Seq owners = zkc.getChildren(path); List ownersSerialize = JavaConversions.seqAsJavaList(owners); - consumers.put(page.getSearch(), ownersSerialize); + consumers.put(displayInfo.getSearch(), ownersSerialize); } else { log.error("Consumer Path[" + path + "] is not exist."); } @@ -280,7 +266,7 @@ public class KafkaServiceImpl implements KafkaService { List groups = JavaConversions.seqAsJavaList(subConsumersPaths); int offset = 0; for (String group : groups) { - if (offset < (page.getDisplayLength() + page.getDisplayStart()) && offset >= page.getDisplayStart()) { + if (offset < (displayInfo.getDisplayLength() + displayInfo.getDisplayStart()) && offset >= displayInfo.getDisplayStart()) { String path = CONSUMERS_PATH + "/" + group + "/owners"; if (zkc.pathExists(path)) { Seq owners = zkc.getChildren(path); @@ -293,31 +279,15 @@ public class KafkaServiceImpl implements KafkaService { offset++; } } - } catch (Exception ex) { - log.error(ex.getMessage()); - } finally { - if (zkc != null) { - KafkaResourcePoolUtils.release(clusterAlias, zkc); - zkc = null; - } + } finally { + KafkaResourcePoolUtils.release(clusterAlias, zkc); } + log.info("kafka集群:[{}],分页:[{}] 消费者信息 ==> {}", clusterAlias, displayInfo, consumers); return consumers; } - /** - * According to group, topic and partition to get offset from zookeeper. - * - * @param topic - * Filter topic. - * @param group - * Filter group. - * @param partition - * Filter partition. - * @return OffsetZkInfo. - * @see OffsetZkInfo - */ @Override - public OffsetZkInfo getOffset(String clusterAlias, String topic, String group, int partition) { + public OffsetZkInfo getGroupTopicPartitionOffset(String clusterAlias, String topic, String group, int partition) { KafkaZkClient zkc = KafkaResourcePoolUtils.getZookeeperClient(clusterAlias); OffsetZkInfo offsetZk = new OffsetZkInfo(); String offsetPath = CONSUMERS_PATH + "/" + group + "/offsets/" + topic + "/" + partition; @@ -328,51 +298,41 @@ public class KafkaServiceImpl implements KafkaService { tuple = zkc.getDataAndStat(offsetPath); } else { log.info("Partition[" + partition + "],OffsetPath[" + offsetPath + "] is not exist!"); - if (zkc != null) { - KafkaResourcePoolUtils.release(clusterAlias, zkc); - zkc = null; - } return offsetZk; } + + String tupleString = new String(tuple._1.get()); + long offsetSize = Long.parseLong(tupleString); + if (zkc.pathExists(ownersPath)) { + Tuple2, Stat> tuple2 = zkc.getDataAndStat(ownersPath); + String tupleString2 = new String(tuple2._1.get()); + offsetZk.setOwners(tupleString2 == null ? "" : tupleString2); + } else { + offsetZk.setOwners(""); + } + offsetZk.setOffset(offsetSize); + offsetZk.setCreate(DateUtils.convertUnixTime2Date(tuple._2.getCtime())); + offsetZk.setModify(DateUtils.convertUnixTime2Date(tuple._2.getMtime())); } catch (Exception ex) { log.error("Partition[" + partition + "],get offset has error", ex); - if (zkc != null) { - KafkaResourcePoolUtils.release(clusterAlias, zkc); - zkc = null; - } - return offsetZk; - } - String tupleString = new String(tuple._1.get()); - long offsetSize = Long.parseLong(tupleString); - if (zkc.pathExists(ownersPath)) { - Tuple2, Stat> tuple2 = zkc.getDataAndStat(ownersPath); - String tupleString2 = new String(tuple2._1.get()); - offsetZk.setOwners(tupleString2 == null ? "" : tupleString2); - } else { - offsetZk.setOwners(""); - } - offsetZk.setOffset(offsetSize); - offsetZk.setCreate(DateUtils.convertUnixTime2Date(tuple._2.getCtime())); - offsetZk.setModify(DateUtils.convertUnixTime2Date(tuple._2.getMtime())); - if (zkc != null) { + }finally { KafkaResourcePoolUtils.release(clusterAlias, zkc); - zkc = null; } return offsetZk; } - /** According to topic and partition to obtain Replicas & Isr. */ @Override - public String getReplicasIsr(String clusterAlias, String topic, int partitionId) { + public String getTopicPartitionReplicas(String clusterAlias, String topic, int partitionId) { KafkaZkClient zkc = KafkaResourcePoolUtils.getZookeeperClient(clusterAlias); - TopicPartition tp = new TopicPartition(topic, partitionId); - Seq replis = zkc.getReplicasForPartition(tp); - List targets = JavaConversions.seqAsJavaList(replis); - if (zkc != null) { + try { + TopicPartition topicPartition = new TopicPartition(topic, partitionId); + Seq replicas = zkc.getReplicasForPartition(topicPartition); + String result = JSON.toJSONString(JavaConversions.seqAsJavaList(replicas)); + log.info("kafka集群[{}],主题:[{}], 分区号:[{}], 复制分区信息 ==> {}", clusterAlias, topic, partitionId, result); + return result; + }finally { KafkaResourcePoolUtils.release(clusterAlias, zkc); - zkc = null; } - return targets.toString(); } /** Get zookeeper cluster information. */ @@ -395,72 +355,20 @@ public class KafkaServiceImpl implements KafkaService { public JSONObject zkCliStatus(String clusterAlias) { JSONObject target = new JSONObject(); KafkaZkClient zkc = KafkaResourcePoolUtils.getZookeeperClient(clusterAlias); - if (zkc != null) { - target.put("live", true); - target.put("list", kafkaClustersConfig.getClusterConfigByName(clusterAlias).getZkList()); - } else { - target.put("live", false); - target.put("list", kafkaClustersConfig.getClusterConfigByName(clusterAlias).getZkList()); - } - if (zkc != null) { + try{ + if (zkc != null) { + target.put("live", true); + target.put("list", kafkaClustersConfig.getClusterConfigByName(clusterAlias).getZkList()); + } else { + target.put("live", false); + target.put("list", kafkaClustersConfig.getClusterConfigByName(clusterAlias).getZkList()); + } + }finally { KafkaResourcePoolUtils.release(clusterAlias, zkc); - zkc = null; } return target; } - /** - * Create topic to kafka cluster, it is worth noting that the backup number - * must be less than or equal to brokers data. - * - * @param topicName - * Create topic name. - * @param partitions - * Create topic partitions. - * @param replic - * Replic numbers. - * @return Map. - */ - @Override - public Map create(String clusterAlias, String topicName, String partitions, String replic) { - Map targets = new HashMap(); - int brokers = getBrokerInfos(clusterAlias).size(); - if (Integer.parseInt(replic) > brokers) { - targets.put("status", "error"); - targets.put("info", "replication factor: " + replic + " larger than available brokers: " + brokers); - return targets; - } - AdminClient adminClient = KafkaResourcePoolUtils.getKafkaClient(clusterAlias); - try { - NewTopic newTopic = new NewTopic(topicName, Integer.valueOf(partitions), Short.valueOf(replic)); - adminClient.createTopics(Collections.singleton(newTopic)).all().get(); - } catch (Exception e) { - log.info("Create kafka topic has error", e); - } finally { - KafkaResourcePoolUtils.release(clusterAlias, adminClient); - } - - targets.put("status", "success"); - targets.put("info", "Create topic[" + topicName + "] has successed,partitions numbers is [" + partitions + "],replication-factor numbers is [" + replic + "]"); - return targets; - } - - /** Delete topic to kafka cluster. */ - public Map delete(String clusterAlias, String topicName) { - Map targets = new HashMap(); - AdminClient adminClient = KafkaResourcePoolUtils.getKafkaClient(clusterAlias); - try { - adminClient.deleteTopics(Collections.singleton(topicName)).all().get(); - targets.put("status", "success"); - } catch (Exception e) { - log.error("Delete kafka topic has error", e); - targets.put("status", "failed"); - } finally { - KafkaResourcePoolUtils.release(clusterAlias, adminClient); - } - return targets; - } - /** Get kafka brokers from zookeeper. */ private List getBrokers(String clusterAlias) { List targets = new ArrayList(); @@ -748,7 +656,7 @@ public class KafkaServiceImpl implements KafkaService { } /** Get kafka consumer information pages not owners. */ - public OwnerInfo getKafkaActiverNotOwners(String clusterAlias, String group) { + public OwnerInfo getKafkaActiveNotOwners(String clusterAlias, String group) { OwnerInfo ownerInfo = new OwnerInfo(); JSONArray consumerGroups = getKafkaMetadata(parseBrokerServer(clusterAlias), group, clusterAlias); int activerCounter = 0; @@ -889,10 +797,10 @@ public class KafkaServiceImpl implements KafkaService { } /** Get kafka 0.10.x topic real logsize by partitionid. */ - public long getKafkaRealLogSize(String clusterAlias, String topic, int partitionid) { + public long getKafkaRealLogSize(String clusterAlias, String topic, int partitionId) { long realLogSize = 0L; KafkaConsumer consumer = KafkaResourcePoolUtils.getKafkaConsumer(clusterAlias); - TopicPartition tp = new TopicPartition(topic, partitionid); + TopicPartition tp = new TopicPartition(topic, partitionId); consumer.assign(Collections.singleton(tp)); java.util.Map endLogSize = consumer.endOffsets(Collections.singleton(tp)); java.util.Map startLogSize = consumer.beginningOffsets(Collections.singleton(tp)); @@ -907,11 +815,11 @@ public class KafkaServiceImpl implements KafkaService { } /** Get kafka 0.10.x topic real logsize by partitionid set. */ - public long getKafkaRealLogSize(String clusterAlias, String topic, Set partitionids) { + public long getKafkaRealLogSize(String clusterAlias, String topic, Set partitionIds) { long realLogSize = 0L; KafkaConsumer consumer = KafkaResourcePoolUtils.getKafkaConsumer(clusterAlias); Set tps = new HashSet<>(); - for (int partitionId : partitionids) { + for (int partitionId : partitionIds) { TopicPartition tp = new TopicPartition(topic, partitionId); tps.add(tp); } @@ -1038,13 +946,12 @@ public class KafkaServiceImpl implements KafkaService { List partitions = findTopicPartition(clusterAlias, topic); for (String partition : partitions) { int partitionInt = Integer.parseInt(partition); - OffsetZkInfo offsetZk = getOffset(clusterAlias, topic, group, partitionInt); + OffsetZkInfo offsetZk = getGroupTopicPartitionOffset(clusterAlias, topic, group, partitionInt); long logSize = getLogSize(clusterAlias, topic, partitionInt); lag += logSize - offsetZk.getOffset(); } } catch (Exception e) { - log.error("Get cluser[" + clusterAlias + "] active group[" + group + "] topic[" + topic + "] lag has error, msg is " + e.getMessage()); - e.printStackTrace(); + log.error("Get cluster[" + clusterAlias + "] active group[" + group + "] topic[" + topic + "] lag has error", e); } return lag; } @@ -1116,32 +1023,29 @@ public class KafkaServiceImpl implements KafkaService { List brokers = getBrokerInfos(clusterAlias); for (KafkaBrokerInfo broker : brokers) { try { - JMXServiceURL jmxSeriverUrl = new JMXServiceURL(String.format(JMX, broker.getHost() + ":" + broker.getJmxPort())); - connector = JMXFactoryUtils.connectWithTimeout(jmxSeriverUrl, 30, TimeUnit.SECONDS); + JMXServiceURL jmxServerUrl = new JMXServiceURL(String.format(JMX, broker.getHost() + ":" + broker.getJmxPort())); + connector = JMXFactoryUtils.connectWithTimeout(jmxServerUrl, 30, TimeUnit.SECONDS); if (connector != null) { break; } } catch (Exception e) { - log.error("Get kafka old version logsize has error, msg is " + e.getMessage()); - e.printStackTrace(); + log.error("Get kafka old version logsize has error", e); } } long logSize = 0L; try { MBeanServerConnection mbeanConnection = connector.getMBeanServerConnection(); - for (int partitionid : partitionIds) { - logSize += Long.parseLong(mbeanConnection.getAttribute(new ObjectName(String.format(KafkaServer8.END_LOG_SIZE.getValue(), topic, partitionid)), KafkaServer8.VALUE.getValue()).toString()); + for (int partitionId : partitionIds) { + logSize += Long.parseLong(mbeanConnection.getAttribute(new ObjectName(String.format(KafkaServer8.END_LOG_SIZE.getValue(), topic, partitionId)), KafkaServer8.VALUE.getValue()).toString()); } } catch (Exception ex) { - log.error("Get kafka old version logsize & parse has error, msg is " + ex.getMessage()); - ex.printStackTrace(); + log.error("Get kafka old version logsize & parse has error", ex); } finally { if (connector != null) { try { connector.close(); } catch (IOException e) { - log.error("Close jmx connector has error, msg is " + e.getMessage()); - e.printStackTrace(); + log.error("Close jmx connector has error", e); } } } diff --git a/src/main/java/org/smartloli/kafka/eagle/web/service/impl/MetricsServiceImpl.java b/src/main/java/org/smartloli/kafka/eagle/web/service/impl/MetricsServiceImpl.java index c46fe95..5cc9e14 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/service/impl/MetricsServiceImpl.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/service/impl/MetricsServiceImpl.java @@ -20,6 +20,7 @@ package org.smartloli.kafka.eagle.web.service.impl; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; import org.smartloli.kafka.eagle.web.config.KafkaClustersConfig; import org.smartloli.kafka.eagle.web.constant.KafkaConstants; import org.smartloli.kafka.eagle.web.constant.MBeanConstants; @@ -54,6 +55,7 @@ import java.util.Map.Entry; * Created by Jul 17, 2017 Update by No 3, 2018 by cocodroid */ @Service +@Slf4j public class MetricsServiceImpl implements MetricsService { @Autowired @@ -77,20 +79,19 @@ public class MetricsServiceImpl implements MetricsService { @Autowired private KafkaClustersConfig kafkaClustersConfig; - /** - * Gets summary monitoring data for all broker. - */ + @Override public String getAllBrokersMBean(String clusterAlias) { - String result = ""; List brokers = kafkaService.getBrokerInfos(clusterAlias); int brokerSize = kafkaClustersConfig.getClusterConfigByName(clusterAlias).getBrokerSize(); + String result = null; if (brokers.size() <= brokerSize) { - result = getOnlineAllBrokersMBean(clusterAlias, brokers); + result = getOnlineAllBrokersMBean(brokers); } else { Map params = new HashMap<>(); params.put("cluster", clusterAlias); result = getOfflineAllBrokersMBean(params); } + log.info("查询Kafka集群[{}]代理节点监控信息 ==> {}", clusterAlias, result); return result; } @@ -146,9 +147,13 @@ public class MetricsServiceImpl implements MetricsService { return JSON.toJSONString(mbeans); } - /** Gets summary online monitoring data for all broker. */ - private String getOnlineAllBrokersMBean(String clusterAlias, List brokers) { - Map mbeans = new HashMap<>(); + /** + * 获取在线节点监控信息 + * @param brokers + * @return + */ + private String getOnlineAllBrokersMBean(List brokers) { + Map mbeanInfoMap = new HashMap<>(); for (KafkaBrokerInfo broker : brokers) { String uri = broker.getHost() + ":" + broker.getJmxPort(); MBeanInfo bytesIn = mx4jService.bytesInPerSec(uri); @@ -163,30 +168,19 @@ public class MetricsServiceImpl implements MetricsService { MBeanInfo replicationBytesInPerSec = mx4jService.replicationBytesInPerSec(uri); MBeanInfo replicationBytesOutPerSec = mx4jService.replicationBytesOutPerSec(uri); - assembleMBeanInfo(mbeans, MBeanConstants.MESSAGES_IN, messageIn); - - assembleMBeanInfo(mbeans, MBeanConstants.BYTES_IN, bytesIn); - - assembleMBeanInfo(mbeans, MBeanConstants.BYTES_OUT, bytesOut); - - assembleMBeanInfo(mbeans, MBeanConstants.BYTES_REJECTED, bytesRejected); - - assembleMBeanInfo(mbeans, MBeanConstants.FAILED_FETCH_REQUEST, failedFetchRequest); - - assembleMBeanInfo(mbeans, MBeanConstants.FAILED_PRODUCE_REQUEST, failedProduceRequest); - - assembleMBeanInfo(mbeans, MBeanConstants.PRODUCEMESSAGECONVERSIONS, produceMessageConversions); - - assembleMBeanInfo(mbeans, MBeanConstants.TOTALFETCHREQUESTSPERSEC, totalFetchRequests); - - assembleMBeanInfo(mbeans, MBeanConstants.TOTALPRODUCEREQUESTSPERSEC, totalProduceRequests); - - assembleMBeanInfo(mbeans, MBeanConstants.REPLICATIONBYTESINPERSEC, replicationBytesInPerSec); - - assembleMBeanInfo(mbeans, MBeanConstants.REPLICATIONBYTESOUTPERSEC, replicationBytesOutPerSec); - + assembleMBeanInfo(mbeanInfoMap, MBeanConstants.MESSAGES_IN, messageIn); + assembleMBeanInfo(mbeanInfoMap, MBeanConstants.BYTES_IN, bytesIn); + assembleMBeanInfo(mbeanInfoMap, MBeanConstants.BYTES_OUT, bytesOut); + assembleMBeanInfo(mbeanInfoMap, MBeanConstants.BYTES_REJECTED, bytesRejected); + assembleMBeanInfo(mbeanInfoMap, MBeanConstants.FAILED_FETCH_REQUEST, failedFetchRequest); + assembleMBeanInfo(mbeanInfoMap, MBeanConstants.FAILED_PRODUCE_REQUEST, failedProduceRequest); + assembleMBeanInfo(mbeanInfoMap, MBeanConstants.PRODUCEMESSAGECONVERSIONS, produceMessageConversions); + assembleMBeanInfo(mbeanInfoMap, MBeanConstants.TOTALFETCHREQUESTSPERSEC, totalFetchRequests); + assembleMBeanInfo(mbeanInfoMap, MBeanConstants.TOTALPRODUCEREQUESTSPERSEC, totalProduceRequests); + assembleMBeanInfo(mbeanInfoMap, MBeanConstants.REPLICATIONBYTESINPERSEC, replicationBytesInPerSec); + assembleMBeanInfo(mbeanInfoMap, MBeanConstants.REPLICATIONBYTESOUTPERSEC, replicationBytesOutPerSec); } - for (Entry entry : mbeans.entrySet()) { + for (Entry entry : mbeanInfoMap.entrySet()) { if (entry == null || entry.getValue() == null) { continue; } @@ -195,7 +189,7 @@ public class MetricsServiceImpl implements MetricsService { entry.getValue().setMeanRate(StrUtils.assembly(entry.getValue().getMeanRate())); entry.getValue().setOneMinute(StrUtils.assembly(entry.getValue().getOneMinute())); } - return JSON.toJSONString(mbeans); + return JSON.toJSONString(mbeanInfoMap); } private void assembleMBeanInfo(Map mbeans, String mBeanInfoKey, MBeanInfo mBeanInfo) { diff --git a/src/main/java/org/smartloli/kafka/eagle/web/service/impl/Mx4jServiceImpl.java b/src/main/java/org/smartloli/kafka/eagle/web/service/impl/Mx4jServiceImpl.java index 0b0688a..b19f31b 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/service/impl/Mx4jServiceImpl.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/service/impl/Mx4jServiceImpl.java @@ -242,8 +242,7 @@ public class Mx4jServiceImpl implements Mx4jService { mbeanInfo.setOneMinute("0.0"); } } catch (Exception e) { - log.error("JMX service url[" + uri + "] create has error,msg is " + e.getMessage()); - e.printStackTrace(); + log.error("JMX service url[" + uri + "] create has error", e); mbeanInfo.setFifteenMinute("0.0"); mbeanInfo.setFiveMinute("0.0"); mbeanInfo.setMeanRate("0.0"); @@ -253,8 +252,7 @@ public class Mx4jServiceImpl implements Mx4jService { try { connector.close(); } catch (Exception e) { - log.error("Close JMXConnector[" + uri + "] has error,msg is " + e.getMessage()); - e.printStackTrace(); + log.error("Close JMXConnector[" + uri + "] has error", e); } } } diff --git a/src/main/java/org/smartloli/kafka/eagle/web/service/impl/OffsetServiceImpl.java b/src/main/java/org/smartloli/kafka/eagle/web/service/impl/OffsetServiceImpl.java index 102ef14..80bce66 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/service/impl/OffsetServiceImpl.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/service/impl/OffsetServiceImpl.java @@ -85,7 +85,7 @@ public class OffsetServiceImpl implements OffsetService { List targets = new ArrayList(); for (String partition : partitions) { int partitionInt = Integer.parseInt(partition); - OffsetZkInfo offsetZk = kafkaService.getOffset(clusterAlias, topic, group, partitionInt); + OffsetZkInfo offsetZk = kafkaService.getGroupTopicPartitionOffset(clusterAlias, topic, group, partitionInt); OffsetInfo offsetInfo = new OffsetInfo(); long logSize = 0L; if ("kafka".equals(kafkaClustersConfig.getClusterConfigByName(clusterAlias).getOffsetStorage())) { @@ -142,7 +142,7 @@ public class OffsetServiceImpl implements OffsetService { /** Judge group & topic from KafkaConstants has exist. */ private boolean hasGroupTopic(String clusterAlias, String group, String topic) { - return kafkaService.findTopicAndGroupExist(clusterAlias, topic, group); + return kafkaService.findTopicExistInGroup(clusterAlias, topic, group); } /** Judge group & topic exist Kafka topic or KafkaConstants. */ diff --git a/src/main/java/org/smartloli/kafka/eagle/web/service/impl/TopicServiceImpl.java b/src/main/java/org/smartloli/kafka/eagle/web/service/impl/TopicServiceImpl.java index a6d104c..8a026a8 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/service/impl/TopicServiceImpl.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/service/impl/TopicServiceImpl.java @@ -20,6 +20,9 @@ package org.smartloli.kafka.eagle.web.service.impl; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; import org.smartloli.kafka.eagle.web.config.KafkaClustersConfig; import org.smartloli.kafka.eagle.web.constant.KafkaConstants; import org.smartloli.kafka.eagle.web.constant.MBeanConstants; @@ -37,10 +40,12 @@ import org.smartloli.kafka.eagle.web.protocol.topic.TopicSqlHistory; import org.smartloli.kafka.eagle.web.service.*; import org.smartloli.kafka.eagle.web.sql.execute.KafkaSqlParser; import org.smartloli.kafka.eagle.web.util.DateUtils; +import org.smartloli.kafka.eagle.web.util.KafkaResourcePoolUtils; import org.smartloli.kafka.eagle.web.util.StrUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -57,39 +62,70 @@ import java.util.Map.Entry; * Update by hexiang 20170216 */ @Service +@Slf4j public class TopicServiceImpl implements TopicService { @Autowired private TopicDao topicDao; - /** Kafka service interface. */ @Autowired private KafkaService kafkaService; @Autowired private KafkaSqlParser kafkaSqlParser; - /** - * Kafka topic config service interface. - */ @Autowired private KafkaMetricsService kafkaMetricsService; - /** - * Broker service interface. - */ @Autowired private BrokerService brokerService; - /** - * Mx4j service interface. - */ @Autowired private Mx4jService mx4jService; @Autowired private KafkaClustersConfig kafkaClustersConfig; + @Override + public Map createTopic(String clusterAlias, String topicName, String partitions, String replica) { + Map targets = new HashMap<>(); + int brokers = kafkaService.getBrokerInfos(clusterAlias).size(); + if (Integer.parseInt(replica) > brokers) { + targets.put("status", "error"); + targets.put("info", "replication factor: " + replica + " larger than available brokers: " + brokers); + return targets; + } + + AdminClient adminClient = KafkaResourcePoolUtils.getKafkaClient(clusterAlias); + try { + NewTopic newTopic = new NewTopic(topicName, Integer.parseInt(partitions), Short.parseShort(replica)); + adminClient.createTopics(Collections.singleton(newTopic)).all().get(); + } catch (Exception e) { + log.info("Create kafka topic has error", e); + } finally { + KafkaResourcePoolUtils.release(clusterAlias, adminClient); + } + targets.put("status", "success"); + targets.put("info", "Create topic[" + topicName + "] has successed,partitions numbers is [" + partitions + "],replication-factor numbers is [" + replica + "]"); + return targets; + } + + @Override + public Map deleteTopic(String clusterAlias, String topicName) { + Map targets = new HashMap<>(); + AdminClient adminClient = KafkaResourcePoolUtils.getKafkaClient(clusterAlias); + try { + adminClient.deleteTopics(Collections.singleton(topicName)).all().get(); + targets.put("status", "success"); + } catch (Exception e) { + log.error("Delete kafka topic has error", e); + targets.put("status", "failed"); + } finally { + KafkaResourcePoolUtils.release(clusterAlias, adminClient); + } + return targets; + } + /** * Find topic name in all topics. */ diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 5654f85..db571e8 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -29,7 +29,7 @@ kafka.eagle.metrics.retain=30 kafka.eagle.clusters[0].alias=cluster1 kafka.eagle.clusters[0].zkList=10.101.72.43:2181,10.101.72.43:2182,10.101.72.43:2183 kafka.eagle.clusters[0].offsetStorage=kafka -kafka.eagle.clusters[0].brokerSize=20 +kafka.eagle.clusters[0].brokerSize=3 kafka.eagle.clusters[0].sasl.enable=true # #cluster1.kafka.eagle.sasl.mechanism=SCRAM-SHA-256 kafka.eagle.clusters[0].sasl.protocol=SASL_PLAINTEXT diff --git a/src/test/java/org/smartloli/kafka/eagle/service/KafkaServiceTest.java b/src/test/java/org/smartloli/kafka/eagle/service/KafkaServiceTest.java index e2c1e23..1f16ee4 100644 --- a/src/test/java/org/smartloli/kafka/eagle/service/KafkaServiceTest.java +++ b/src/test/java/org/smartloli/kafka/eagle/service/KafkaServiceTest.java @@ -111,4 +111,13 @@ public class KafkaServiceTest { List metadataInfos = kafkaService.findKafkaLeader("cluster1", "MSG_OUTBOUND_APP3331"); log.info("topic主题Leader消息 ==> {}", metadataInfos); } + + /** + * 获取主题分区复制分区信息 + */ + @Test + public void getTopicPartitionReplicasTest(){ + String result = kafkaService.getTopicPartitionReplicas("cluster1", "MSG_OUTBOUND_APP3331", 2); + Assert.assertNotNull(result); + } } diff --git a/src/test/java/org/smartloli/kafka/eagle/service/MetricsServiceTest.java b/src/test/java/org/smartloli/kafka/eagle/service/MetricsServiceTest.java new file mode 100644 index 0000000..6522751 --- /dev/null +++ b/src/test/java/org/smartloli/kafka/eagle/service/MetricsServiceTest.java @@ -0,0 +1,35 @@ +package org.smartloli.kafka.eagle.service; + +import lombok.extern.slf4j.Slf4j; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.smartloli.kafka.eagle.web.KafkaEagleBootstrap; +import org.smartloli.kafka.eagle.web.service.MetricsService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +/** + * @author zhiwei_yang + * @time 2020-5-16-9:14 + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = KafkaEagleBootstrap.class) +@Slf4j +public class MetricsServiceTest { + + @Autowired + private MetricsService metricsService; + + private final String kafkaClusterName = "cluster1"; + + /** + * 获取所有节点监控信息 + */ + @Test + public void getAllBrokersMBeanTest(){ + String result= metricsService.getAllBrokersMBean(kafkaClusterName); + Assert.assertNotNull(result); + } +} -- Gitee From aaadf62022a319e6339c4b9f9b98655f48d4deb7 Mon Sep 17 00:00:00 2001 From: zhiwei_yang Date: Sat, 16 May 2020 10:41:04 +0800 Subject: [PATCH 04/28] =?UTF-8?q?getKafkaMeta=E6=8E=A5=E5=8F=A3=E6=94=B9?= =?UTF-8?q?=E9=80=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../eagle/web/constant/KafkaConstants.java | 7 ++ .../eagle/web/job/KafkaTopicMetricsJob.java | 14 ++-- .../eagle/web/service/ConsumerService.java | 12 +++ .../kafka/eagle/web/service/KafkaService.java | 18 ++--- .../web/service/impl/AlertServiceImpl.java | 11 ++- .../web/service/impl/ConsumerServiceImpl.java | 77 +++++++++++++++--- .../service/impl/DashboardServiceImpl.java | 2 +- .../web/service/impl/KafkaServiceImpl.java | 81 +++---------------- 8 files changed, 121 insertions(+), 101 deletions(-) diff --git a/src/main/java/org/smartloli/kafka/eagle/web/constant/KafkaConstants.java b/src/main/java/org/smartloli/kafka/eagle/web/constant/KafkaConstants.java index b4aa6c7..e9a6714 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/constant/KafkaConstants.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/constant/KafkaConstants.java @@ -90,6 +90,13 @@ public final class KafkaConstants { public static final String KAFKA_ZK_MIN_IDLE = "kafka.zk.min.idle"; public static final String KAFKA_ZK_MAX_IDLE = "kafka.zk.max.idle"; public static final String KAFKA_ZK_SESSION_TIMEOUT_MS = "kafka.zk.session.timeout.ms"; + public static final String BROKER_IDS_PATH = "/brokers/ids"; + public static final String BROKER_TOPICS_PATH = "/brokers/topics"; + public static final String DELETE_TOPICS_PATH = "/admin/delete_topics"; + public static final String CONSUMERS_PATH = "/consumers"; + public static final String OWNERS = "/owners"; + public static final String TOPIC_ISR = "/brokers/topics/%s/partitions/%s/state"; + @Value("${" + KAFKA_EAGLE_SQL_TOPIC_RECORDS_MAX + ":5000}") public static Long POSITION; diff --git a/src/main/java/org/smartloli/kafka/eagle/web/job/KafkaTopicMetricsJob.java b/src/main/java/org/smartloli/kafka/eagle/web/job/KafkaTopicMetricsJob.java index 47e72d1..786ab13 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/job/KafkaTopicMetricsJob.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/job/KafkaTopicMetricsJob.java @@ -27,6 +27,7 @@ import org.smartloli.kafka.eagle.web.config.SingleClusterConfig; import org.smartloli.kafka.eagle.web.constant.TopicConstants; import org.smartloli.kafka.eagle.web.protocol.bscreen.BScreenConsumerInfo; import org.smartloli.kafka.eagle.web.service.BrokerService; +import org.smartloli.kafka.eagle.web.service.ConsumerService; import org.smartloli.kafka.eagle.web.service.KafkaService; import org.smartloli.kafka.eagle.web.service.MetricsService; import org.smartloli.kafka.eagle.web.util.DateUtils; @@ -49,18 +50,15 @@ public class KafkaTopicMetricsJob { @Autowired private MetricsService metricsService; - /** - * Kafka service interface. - */ @Autowired private KafkaService kafkaService; - /** - * Broker service interface. - */ @Autowired private BrokerService brokerService; + @Autowired + private ConsumerService consumerService; + @Autowired private KafkaClustersConfig kafkaClustersConfig; @@ -69,7 +67,7 @@ public class KafkaTopicMetricsJob { List bscreenConsumers = new ArrayList<>(); for (SingleClusterConfig singleClusterConfig : kafkaClustersConfig.getClusters()) { if ("kafka".equals(singleClusterConfig.getOffsetStorage())) { - JSONArray consumerGroups = JSON.parseArray(kafkaService.getKafkaConsumer(singleClusterConfig.getAlias())); + JSONArray consumerGroups = JSON.parseArray(consumerService.getKafkaConsumer(singleClusterConfig.getAlias())); for (Object object : consumerGroups) { JSONObject consumerGroup = (JSONObject) object; String group = consumerGroup.getString("group"); @@ -138,7 +136,7 @@ public class KafkaTopicMetricsJob { } } } else { - Map> consumerGroups = kafkaService.getConsumers(singleClusterConfig.getAlias()); + Map> consumerGroups = consumerService.getConsumers(singleClusterConfig.getAlias()); for (Entry> entry : consumerGroups.entrySet()) { String group = entry.getKey(); for (String topic : kafkaService.findActiveTopics(singleClusterConfig.getAlias(), group)) { diff --git a/src/main/java/org/smartloli/kafka/eagle/web/service/ConsumerService.java b/src/main/java/org/smartloli/kafka/eagle/web/service/ConsumerService.java index 874aa99..5859696 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/service/ConsumerService.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/service/ConsumerService.java @@ -19,6 +19,9 @@ package org.smartloli.kafka.eagle.web.service; import org.smartloli.kafka.eagle.web.protocol.DisplayInfo; +import java.util.List; +import java.util.Map; + /** * Kafka consumer data interface. * @@ -30,6 +33,15 @@ import org.smartloli.kafka.eagle.web.protocol.DisplayInfo; */ public interface ConsumerService { + /** + * 获取消费者信息 + * @param clusterAlias kafka集群名称 + */ + Map> getConsumers(String clusterAlias); + + /** Get kafka 0.10.x consumer group & topic information. */ + String getKafkaConsumer(String clusterAlias); + /** * Get active topic graph data interface. */ diff --git a/src/main/java/org/smartloli/kafka/eagle/web/service/KafkaService.java b/src/main/java/org/smartloli/kafka/eagle/web/service/KafkaService.java index b6156b9..034776c 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/service/KafkaService.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/service/KafkaService.java @@ -17,6 +17,7 @@ */ package org.smartloli.kafka.eagle.web.service; +import com.alibaba.fastjson.JSONArray; import org.apache.kafka.common.TopicPartition; import org.smartloli.kafka.eagle.web.protocol.*; @@ -83,12 +84,6 @@ public interface KafkaService { */ String getBrokerJMXFromIds(String clusterAlias, int ids); - /** - * 从Zookeeper获取消费者信息 - * @param clusterAlias kafka集群名称 - */ - Map> getConsumers(String clusterAlias); - /** * 从Zookeeper获取分页消费者信息 * @@ -128,13 +123,18 @@ public interface KafkaService { /** Get kafka 0.10.x consumer topic, maybe consumer topic owner is null. */ Set getKafkaConsumerTopics(String clusterAlias, String group); - - /** Get kafka 0.10.x consumer group & topic information. */ - String getKafkaConsumer(String clusterAlias); /** Get kafka 0.10.x consumer group & topic information used for page. */ String getKafkaConsumer(String clusterAlias, DisplayInfo displayInfo); + /** + * 获取Kafka元数据信息 + * @param clusterAlias kafka集群名称 + * @param group 消费组名称 + * @return + */ + JSONArray getKafkaMetadata(String clusterAlias, String group); + @Deprecated /** Get kafka consumer information pages. */ String getKafkaActiverSize(String clusterAlias, String group); diff --git a/src/main/java/org/smartloli/kafka/eagle/web/service/impl/AlertServiceImpl.java b/src/main/java/org/smartloli/kafka/eagle/web/service/impl/AlertServiceImpl.java index a03492e..ba50801 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/service/impl/AlertServiceImpl.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/service/impl/AlertServiceImpl.java @@ -28,6 +28,7 @@ import org.smartloli.kafka.eagle.web.protocol.alarm.AlarmConfigInfo; import org.smartloli.kafka.eagle.web.protocol.alarm.AlarmConsumerInfo; import org.smartloli.kafka.eagle.web.protocol.topic.TopicLogSize; import org.smartloli.kafka.eagle.web.service.AlertService; +import org.smartloli.kafka.eagle.web.service.ConsumerService; import org.smartloli.kafka.eagle.web.service.KafkaService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -49,10 +50,12 @@ import java.util.regex.Pattern; @Service public class AlertServiceImpl implements AlertService { - /** Kafka service interface. */ @Autowired private KafkaService kafkaService; + @Autowired + private ConsumerService consumerService; + @Autowired private AlertDao alertDao; @@ -83,7 +86,7 @@ public class AlertServiceImpl implements AlertService { } private String getAlarmConsumerGroup(String clusterAlias, String search) { - Map> consumers = kafkaService.getConsumers(clusterAlias); + Map> consumers = consumerService.getConsumers(clusterAlias); JSONArray groups = new JSONArray(); int offset = 0; if (search.length() > 0) { @@ -111,7 +114,7 @@ public class AlertServiceImpl implements AlertService { private String getAlarmConsumerGroupKafka(String clusterAlias, String search) { int offset = 0; JSONArray groups = new JSONArray(); - JSONArray consumerGroups = JSON.parseArray(kafkaService.getKafkaConsumer(clusterAlias)); + JSONArray consumerGroups = JSON.parseArray(consumerService.getKafkaConsumer(clusterAlias)); if (search.length() > 0) { for (Object object : consumerGroups) { JSONObject consumerGroup = (JSONObject) object; @@ -137,7 +140,7 @@ public class AlertServiceImpl implements AlertService { } private String getAlarmConsumerTopic(String clusterAlias, String group, String search) { - Map> consumers = kafkaService.getConsumers(clusterAlias); + Map> consumers = consumerService.getConsumers(clusterAlias); JSONArray topics = new JSONArray(); int offset = 0; if (search.length() > 0) { diff --git a/src/main/java/org/smartloli/kafka/eagle/web/service/impl/ConsumerServiceImpl.java b/src/main/java/org/smartloli/kafka/eagle/web/service/impl/ConsumerServiceImpl.java index 4c4ca7f..35e53ac 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/service/impl/ConsumerServiceImpl.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/service/impl/ConsumerServiceImpl.java @@ -20,10 +20,13 @@ package org.smartloli.kafka.eagle.web.service.impl; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; +import kafka.zk.KafkaZkClient; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.ConsumerGroupListing; -import org.smartloli.kafka.eagle.web.config.KafkaClustersConfig; +import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult; +import org.apache.kafka.clients.admin.ListConsumerGroupsResult; +import org.apache.kafka.common.Node; import org.smartloli.kafka.eagle.web.constant.KafkaConstants; import org.smartloli.kafka.eagle.web.constant.TopicConstants; import org.smartloli.kafka.eagle.web.dao.MBeanDao; @@ -38,6 +41,9 @@ import org.smartloli.kafka.eagle.web.service.KafkaService; import org.smartloli.kafka.eagle.web.util.KafkaResourcePoolUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; +import scala.collection.JavaConversions; +import scala.collection.Seq; import java.util.*; import java.util.Map.Entry; @@ -61,14 +67,67 @@ public class ConsumerServiceImpl implements ConsumerService { @Autowired private TopicDao topicDao; - /** - * Kafka service interface. - */ @Autowired private KafkaService kafkaService; - @Autowired - private KafkaClustersConfig kafkaClustersConfig; + @Override + public Map> getConsumers(String clusterAlias) { + KafkaZkClient kafkaZkClient = KafkaResourcePoolUtils.getZookeeperClient(clusterAlias); + Map> consumers = new HashMap<>(); + try { + Seq subConsumerPaths = kafkaZkClient.getChildren(KafkaConstants.CONSUMERS_PATH); + List groups = JavaConversions.seqAsJavaList(subConsumerPaths); + for (String group : groups) { + String path = KafkaConstants.CONSUMERS_PATH + "/" + group + "/owners"; + if (kafkaZkClient.pathExists(path)) { + Seq owners = kafkaZkClient.getChildren(path); + List ownersSerialize = JavaConversions.seqAsJavaList(owners); + consumers.put(group, ownersSerialize); + } else { + log.error("Consumer Path[" + path + "] is not exist."); + } + } + log.info("查询kafka集群[{}]消费者信息 ==> {}", clusterAlias, consumers); + return consumers; + } finally { + KafkaResourcePoolUtils.release(clusterAlias, kafkaZkClient); + } + } + + /** Get kafka 0.10.x, 1.x, 2.x consumer metadata. */ + @Override + public String getKafkaConsumer(String clusterAlias) { + JSONArray consumerGroups = new JSONArray(); + AdminClient adminClient = KafkaResourcePoolUtils.getKafkaClient(clusterAlias); + try { + ListConsumerGroupsResult cgrs = adminClient.listConsumerGroups(); + Collection consumerGroupListings = cgrs.all().get(); + if (!CollectionUtils.isEmpty(consumerGroupListings)) { + for (ConsumerGroupListing consumerGroupListing : consumerGroupListings) { + JSONObject consumerGroup = new JSONObject(); + String groupId = consumerGroupListing.groupId(); + DescribeConsumerGroupsResult descConsumerGroup = adminClient.describeConsumerGroups(Arrays.asList(groupId)); + if (!groupId.contains("kafka.eagle")) { + consumerGroup.put("group", groupId); + try { + Node node = descConsumerGroup.all().get().get(groupId).coordinator(); + consumerGroup.put("node", node.host() + ":" + node.port()); + } catch (Exception e) { + log.error("Get coordinator node has error, msg is " + e.getMessage()); + e.printStackTrace(); + } + consumerGroup.put("meta", kafkaService.getKafkaMetadata(clusterAlias, groupId)); + consumerGroups.add(consumerGroup); + } + } + } + } catch (Exception e) { + log.error("Get kafka consumer has error", e); + } finally { + KafkaResourcePoolUtils.release(clusterAlias, adminClient); + } + return consumerGroups.toJSONString(); + } /** * Get active topic graph data from kafka cluster. @@ -174,7 +233,7 @@ public class ConsumerServiceImpl implements ConsumerService { if ("kafka".equals(storeFormat)) { return this.getKafkaConsumerGroups(clusterAlias); } else { - return kafkaService.getConsumers(clusterAlias).size(); + return this.getConsumers(clusterAlias).size(); } } @@ -223,7 +282,7 @@ public class ConsumerServiceImpl implements ConsumerService { /** List the name of the topic in the consumer detail information. */ private String getConsumerDetail(String clusterAlias, String group) { - Map> consumers = kafkaService.getConsumers(clusterAlias); + Map> consumers = this.getConsumers(clusterAlias); Map> actvTopics = kafkaService.findActiveTopics(clusterAlias); List kafkaConsumerDetails = new ArrayList(); int id = 0; @@ -252,7 +311,7 @@ public class ConsumerServiceImpl implements ConsumerService { /** Get active grahp data & storage offset in kafka topic. */ private Object getKafkaActive(String clusterAlias) { - JSONArray consumerGroups = JSON.parseArray(kafkaService.getKafkaConsumer(clusterAlias)); + JSONArray consumerGroups = JSON.parseArray(this.getKafkaConsumer(clusterAlias)); JSONObject target = new JSONObject(); JSONArray targets = new JSONArray(); target.put("name", "Active Topics"); diff --git a/src/main/java/org/smartloli/kafka/eagle/web/service/impl/DashboardServiceImpl.java b/src/main/java/org/smartloli/kafka/eagle/web/service/impl/DashboardServiceImpl.java index 89fc27f..5b13891 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/service/impl/DashboardServiceImpl.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/service/impl/DashboardServiceImpl.java @@ -77,7 +77,7 @@ public class DashboardServiceImpl implements DashboardService { * Get consumer number from zookeeper. */ private int getConsumerNumbers(String clusterAlias) { - Map> consumers = kafkaService.getConsumers(clusterAlias); + Map> consumers = consumerService.getConsumers(clusterAlias); int count = 0; for (Entry> entry : consumers.entrySet()) { count += entry.getValue().size(); diff --git a/src/main/java/org/smartloli/kafka/eagle/web/service/impl/KafkaServiceImpl.java b/src/main/java/org/smartloli/kafka/eagle/web/service/impl/KafkaServiceImpl.java index 071c43f..bc08f3c 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/service/impl/KafkaServiceImpl.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/service/impl/KafkaServiceImpl.java @@ -43,7 +43,6 @@ import org.smartloli.kafka.eagle.web.util.JMXFactoryUtils; import org.smartloli.kafka.eagle.web.util.KafkaResourcePoolUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import scala.Option; import scala.Tuple2; @@ -223,30 +222,6 @@ public class KafkaServiceImpl implements KafkaService { return clusterBrokerInfoMap; } - @Override - public Map> getConsumers(String clusterAlias) { - KafkaZkClient kafkaZkClient = KafkaResourcePoolUtils.getZookeeperClient(clusterAlias); - Map> consumers = new HashMap<>(); - try { - Seq subConsumerPaths = kafkaZkClient.getChildren(CONSUMERS_PATH); - List groups = JavaConversions.seqAsJavaList(subConsumerPaths); - for (String group : groups) { - String path = CONSUMERS_PATH + "/" + group + "/owners"; - if (kafkaZkClient.pathExists(path)) { - Seq owners = kafkaZkClient.getChildren(path); - List ownersSerialize = JavaConversions.seqAsJavaList(owners); - consumers.put(group, ownersSerialize); - } else { - log.error("Consumer Path[" + path + "] is not exist."); - } - } - } finally { - KafkaResourcePoolUtils.release(clusterAlias, kafkaZkClient); - } - log.info("查询kafka集群[{}]消费者信息 ==> {}", clusterAlias, consumers); - return consumers; - } - @Override public Map> getConsumers(String clusterAlias, DisplayInfo displayInfo) { KafkaZkClient zkc = KafkaResourcePoolUtils.getZookeeperClient(clusterAlias); @@ -450,7 +425,7 @@ public class KafkaServiceImpl implements KafkaService { /** Get kafka 0.10.x after activer topics. */ public Set getKafkaActiverTopics(String clusterAlias, String group) { - JSONArray consumerGroups = getKafkaMetadata(parseBrokerServer(clusterAlias), group, clusterAlias); + JSONArray consumerGroups = getKafkaMetadata(clusterAlias, group); Set topics = new HashSet<>(); for (Object object : consumerGroups) { JSONObject consumerGroup = (JSONObject) object; @@ -465,7 +440,7 @@ public class KafkaServiceImpl implements KafkaService { } public Set getKafkaConsumerTopics(String clusterAlias, String group) { - JSONArray consumerGroups = getKafkaMetadata(parseBrokerServer(clusterAlias), group, clusterAlias); + JSONArray consumerGroups = getKafkaMetadata(clusterAlias, group); Set topics = new HashSet<>(); for (Object object : consumerGroups) { JSONObject consumerGroup = (JSONObject) object; @@ -477,40 +452,6 @@ public class KafkaServiceImpl implements KafkaService { return topics; } - /** Get kafka 0.10.x, 1.x, 2.x consumer metadata. */ - public String getKafkaConsumer(String clusterAlias) { - JSONArray consumerGroups = new JSONArray(); - AdminClient adminClient = KafkaResourcePoolUtils.getKafkaClient(clusterAlias); - try { - ListConsumerGroupsResult cgrs = adminClient.listConsumerGroups(); - Collection consumerGroupListings = cgrs.all().get(); - if (!CollectionUtils.isEmpty(consumerGroupListings)) { - for (ConsumerGroupListing consumerGroupListing : consumerGroupListings) { - JSONObject consumerGroup = new JSONObject(); - String groupId = consumerGroupListing.groupId(); - DescribeConsumerGroupsResult descConsumerGroup = adminClient.describeConsumerGroups(Arrays.asList(groupId)); - if (!groupId.contains("kafka.eagle")) { - consumerGroup.put("group", groupId); - try { - Node node = descConsumerGroup.all().get().get(groupId).coordinator(); - consumerGroup.put("node", node.host() + ":" + node.port()); - } catch (Exception e) { - log.error("Get coordinator node has error, msg is " + e.getMessage()); - e.printStackTrace(); - } - consumerGroup.put("meta", getKafkaMetadata(parseBrokerServer(clusterAlias), groupId, clusterAlias)); - consumerGroups.add(consumerGroup); - } - } - } - } catch (Exception e) { - log.error("Get kafka consumer has error", e); - } finally { - KafkaResourcePoolUtils.release(clusterAlias, adminClient); - } - return consumerGroups.toJSONString(); - } - /** Get kafka 0.10.x consumer group & topic information used for page. */ @Override public String getKafkaConsumer(String clusterAlias, DisplayInfo displayInfo) { @@ -537,7 +478,7 @@ public class KafkaServiceImpl implements KafkaService { log.error("Get coordinator node has error, msg is " + e.getMessage()); e.printStackTrace(); } - consumerGroup.put("meta", getKafkaMetadata(KafkaResourcePoolUtils.getBootstrapServer(clusterAlias), groupId, clusterAlias)); + consumerGroup.put("meta", getKafkaMetadata(clusterAlias, groupId)); consumerGroups.add(consumerGroup); } offset++; @@ -565,7 +506,7 @@ public class KafkaServiceImpl implements KafkaService { log.error("Get coordinator node has error, msg is " + e.getMessage()); e.printStackTrace(); } - consumerGroup.put("meta", getKafkaMetadata(KafkaResourcePoolUtils.getBootstrapServer(clusterAlias), groupId, clusterAlias)); + consumerGroup.put("meta", getKafkaMetadata(clusterAlias, groupId)); consumerGroups.add(consumerGroup); } offset++; @@ -585,12 +526,12 @@ public class KafkaServiceImpl implements KafkaService { } /** Get kafka 0.10.x consumer metadata. */ - private JSONArray getKafkaMetadata(String bootstrapServers, String group, String clusterAlias) { - Properties prop = new Properties(); + @Override + public JSONArray getKafkaMetadata(String clusterAlias, String group) { JSONArray consumerGroups = new JSONArray(); AdminClient adminClient = KafkaResourcePoolUtils.getKafkaClient(clusterAlias); try { - DescribeConsumerGroupsResult descConsumerGroup = adminClient.describeConsumerGroups(Arrays.asList(group)); + DescribeConsumerGroupsResult descConsumerGroup = adminClient.describeConsumerGroups(Collections.singletonList(group)); Collection consumerMetaInfos = descConsumerGroup.describedGroups().get(group).get().members(); Set hasOwnerTopics = new HashSet<>(); if (consumerMetaInfos.size() > 0) { @@ -636,7 +577,7 @@ public class KafkaServiceImpl implements KafkaService { /** Get kafka 0.10.x consumer pages. */ public String getKafkaActiverSize(String clusterAlias, String group) { - JSONArray consumerGroups = getKafkaMetadata(parseBrokerServer(clusterAlias), group, clusterAlias); + JSONArray consumerGroups = getKafkaMetadata(clusterAlias, group); int activerCounter = 0; Set topics = new HashSet<>(); for (Object object : consumerGroups) { @@ -658,7 +599,7 @@ public class KafkaServiceImpl implements KafkaService { /** Get kafka consumer information pages not owners. */ public OwnerInfo getKafkaActiveNotOwners(String clusterAlias, String group) { OwnerInfo ownerInfo = new OwnerInfo(); - JSONArray consumerGroups = getKafkaMetadata(parseBrokerServer(clusterAlias), group, clusterAlias); + JSONArray consumerGroups = getKafkaMetadata(clusterAlias, group); int activerCounter = 0; Set topics = new HashSet<>(); for (Object object : consumerGroups) { @@ -678,7 +619,7 @@ public class KafkaServiceImpl implements KafkaService { /** Get kafka 0.10.x, 1.x, 2.x consumer topic information. */ public Set getKafkaConsumerTopic(String clusterAlias, String group) { - JSONArray consumerGroups = getKafkaMetadata(parseBrokerServer(clusterAlias), group, clusterAlias); + JSONArray consumerGroups = getKafkaMetadata(clusterAlias, group); Set topics = new HashSet<>(); for (Object object : consumerGroups) { JSONObject consumerGroup = (JSONObject) object; @@ -692,7 +633,7 @@ public class KafkaServiceImpl implements KafkaService { /** Get kafka 0.10.x consumer group and topic. */ public String getKafkaConsumerGroupTopic(String clusterAlias, String group) { - return getKafkaMetadata(parseBrokerServer(clusterAlias), group, clusterAlias).toJSONString(); + return getKafkaMetadata(clusterAlias, group).toJSONString(); } /** Get kafka 0.10.x, 1.x, 2.x offset from topic. */ -- Gitee From 329daace06a005e09dd611b19a46aea936b8a485 Mon Sep 17 00:00:00 2001 From: zhiwei_yang Date: Sat, 16 May 2020 10:55:48 +0800 Subject: [PATCH 05/28] =?UTF-8?q?=E5=89=8D=E7=AB=AF=E9=A1=B5=E9=9D=A2?= =?UTF-8?q?=E5=AF=BC=E8=88=AA=E6=A0=8F=E8=A1=A5=E5=85=85=E9=A6=96=E9=A1=B5?= =?UTF-8?q?=E8=B7=B3=E8=BD=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../smartloli/kafka/eagle/web/constant/HttpConstants.java | 6 ++++-- .../kafka/eagle/web/controller/AccountController.java | 8 -------- .../kafka/eagle/web/controller/PageController.java | 2 +- src/main/webapp/WEB-INF/views/public/navbar.jsp | 6 ++++-- 4 files changed, 9 insertions(+), 13 deletions(-) diff --git a/src/main/java/org/smartloli/kafka/eagle/web/constant/HttpConstants.java b/src/main/java/org/smartloli/kafka/eagle/web/constant/HttpConstants.java index 6136411..22ce23c 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/constant/HttpConstants.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/constant/HttpConstants.java @@ -7,13 +7,15 @@ package org.smartloli.kafka.eagle.web.constant; */ public class HttpConstants { + /** 根路径 **/ + public final static String ROOT_URL = "/"; + /**登陆页**/ - public final static String LOGIN_URL = "/"; + public final static String LOGIN_URL = "/account/signin"; /**登陆逻辑**/ public final static String LOGIN_ACTION_URL = "/account/signin/action"; - /** 退出登陆 **/ public final static String LOGOUT_URL = "/account/signout"; diff --git a/src/main/java/org/smartloli/kafka/eagle/web/controller/AccountController.java b/src/main/java/org/smartloli/kafka/eagle/web/controller/AccountController.java index b15f4dc..7547289 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/controller/AccountController.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/controller/AccountController.java @@ -26,7 +26,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.security.core.annotation.AuthenticationPrincipal; import org.springframework.security.core.userdetails.UserDetails; import org.springframework.stereotype.Controller; -import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; @@ -46,13 +45,6 @@ public class AccountController { @Autowired private UserInfoService userInfoService; - /** 跳转登陆页 */ - @GetMapping("/signin") - @ApiOperation("跳转登陆页") - public String signin() { - return "/account/signin"; - } - /** 重置密码 */ @PostMapping(value = "/resetPassword") @ApiOperation("重置密码") diff --git a/src/main/java/org/smartloli/kafka/eagle/web/controller/PageController.java b/src/main/java/org/smartloli/kafka/eagle/web/controller/PageController.java index b7fe29b..a96c65d 100644 --- a/src/main/java/org/smartloli/kafka/eagle/web/controller/PageController.java +++ b/src/main/java/org/smartloli/kafka/eagle/web/controller/PageController.java @@ -40,7 +40,7 @@ public class PageController { * 跳转面板主页 * @return */ - @GetMapping(HttpConstants.INDEX_URL) + @GetMapping({HttpConstants.INDEX_URL, HttpConstants.ROOT_URL}) @ApiOperation("跳转主页") public String toIndex() { return "/main/index"; diff --git a/src/main/webapp/WEB-INF/views/public/navbar.jsp b/src/main/webapp/WEB-INF/views/public/navbar.jsp index 896edeb..37832a3 100644 --- a/src/main/webapp/WEB-INF/views/public/navbar.jsp +++ b/src/main/webapp/WEB-INF/views/public/navbar.jsp @@ -5,8 +5,10 @@