> entry : consumerGroups.entrySet()) {
String group = entry.getKey();
- for (String topic : kafkaService.getActiveTopic(singleClusterConfig.getAlias(), group)) {
+ for (String topic : kafkaService.findActiveTopics(singleClusterConfig.getAlias(), group)) {
BScreenConsumerInfo bscreenConsumer = new BScreenConsumerInfo();
bscreenConsumer.setCluster(singleClusterConfig.getAlias());
bscreenConsumer.setGroup(group);
diff --git a/src/main/java/org/smartloli/kafka/eagle/web/job/TopicRankJob.java b/src/main/java/org/smartloli/kafka/eagle/web/job/TopicRankJob.java
index bd54f1ef6844ce445a5b063b6a9b7d5551ee0c62..b1e4b32048fb968204aff6eb5b5ea024bfb103c3 100644
--- a/src/main/java/org/smartloli/kafka/eagle/web/job/TopicRankJob.java
+++ b/src/main/java/org/smartloli/kafka/eagle/web/job/TopicRankJob.java
@@ -59,7 +59,7 @@ public class TopicRankJob {
@Autowired
private KafkaClustersConfig kafkaClustersConfig;
- @Scheduled(cron = "0 0/5 * * * ?")
+ @Scheduled(cron = "0 */10 * * * ?")
protected void execute() {
topicLogSizeStats();
topicCapacityStats();
diff --git a/src/main/java/org/smartloli/kafka/eagle/web/pojo/UserInfo.java b/src/main/java/org/smartloli/kafka/eagle/web/pojo/UserInfo.java
index be51896ac47996a2971e6631b5221d906e33b1f8..0e933931a1dc52c7c2b540afa3003b0aff1d1b63 100644
--- a/src/main/java/org/smartloli/kafka/eagle/web/pojo/UserInfo.java
+++ b/src/main/java/org/smartloli/kafka/eagle/web/pojo/UserInfo.java
@@ -31,10 +31,13 @@ import lombok.Data;
public class UserInfo {
/**
- * 用户id
+ * 数据库ID
*/
private int id;
+ /**
+ * 用户ID
+ */
private int rtxno;
/**
diff --git a/src/main/java/org/smartloli/kafka/eagle/web/protocol/DashboardInfo.java b/src/main/java/org/smartloli/kafka/eagle/web/protocol/DashboardInfo.java
index 1181d250e1b61ae9dcc9a0f99f65908d4ad057ee..208d1f77a15be4ee51716123ed7eda34a2a1e383 100644
--- a/src/main/java/org/smartloli/kafka/eagle/web/protocol/DashboardInfo.java
+++ b/src/main/java/org/smartloli/kafka/eagle/web/protocol/DashboardInfo.java
@@ -20,10 +20,8 @@ package org.smartloli.kafka.eagle.web.protocol;
import lombok.Data;
/**
- * Definition dashboard information.
- *
+ * 面板信息
* @author smartloli.
- *
* Created by Aug 13, 2016
*/
@Data
diff --git a/src/main/java/org/smartloli/kafka/eagle/web/protocol/PartitionsInfo.java b/src/main/java/org/smartloli/kafka/eagle/web/protocol/PartitionsInfo.java
index 86788369eb1e04c359d876d387e3f96c05167779..2f9d3a3872792d99a5b88053c3dc1291a4cb8467 100644
--- a/src/main/java/org/smartloli/kafka/eagle/web/protocol/PartitionsInfo.java
+++ b/src/main/java/org/smartloli/kafka/eagle/web/protocol/PartitionsInfo.java
@@ -44,13 +44,27 @@ public class PartitionsInfo{
/** 分区数量 **/
private int partitionNumbers = 0;
+
+ /** Broker倾斜数 **/
private long brokersSkewed;
+
+ /** Broker 使用率 **/
private long brokersSpread;
+
+ /**
+ * 主分区是否倾斜
+ */
private long brokersLeaderSkewed;
+ /**
+ * 创建时间
+ */
@JSONField(format = DateUtils.DATA_FORMAT_YEAR_MON_DAY_HOUR_MIN_SEC)
private Date created;
+ /**
+ * 更新时间
+ */
@JSONField(format = DateUtils.DATA_FORMAT_YEAR_MON_DAY_HOUR_MIN_SEC)
private Date modify;
}
diff --git a/src/main/java/org/smartloli/kafka/eagle/web/protocol/TopicConsumerInfo.java b/src/main/java/org/smartloli/kafka/eagle/web/protocol/TopicConsumerInfo.java
index 3309d38b1fee572a8212f44d5bc0734ca5f56fd4..b3f77f29003f937d9de485a93f4bff8e23b298f5 100644
--- a/src/main/java/org/smartloli/kafka/eagle/web/protocol/TopicConsumerInfo.java
+++ b/src/main/java/org/smartloli/kafka/eagle/web/protocol/TopicConsumerInfo.java
@@ -20,16 +20,19 @@ package org.smartloli.kafka.eagle.web.protocol;
import lombok.Data;
/**
- * Definition Kafka consumer detail information.
- *
+ * 主题消费者信息
* @author smartloli.
- *
* Created by Aug 16, 2016
*/
@Data
public class TopicConsumerInfo {
+ /** 消费者ID **/
private int id;
+
+ /** 消费者主题名称 **/
private String topic;
- private int consumering;
+
+ /** 是否正在消费 **/
+ private int consuming;
}
diff --git a/src/main/java/org/smartloli/kafka/eagle/web/security/DefineAuthenticationFailureHandler.java b/src/main/java/org/smartloli/kafka/eagle/web/security/DefineAuthenticationFailureHandler.java
index 33c14df85bbc2b1a57718c93a214d96ddee7329e..21dd76f85deb70a874acb249f5a83776104451e4 100644
--- a/src/main/java/org/smartloli/kafka/eagle/web/security/DefineAuthenticationFailureHandler.java
+++ b/src/main/java/org/smartloli/kafka/eagle/web/security/DefineAuthenticationFailureHandler.java
@@ -24,8 +24,9 @@ public class DefineAuthenticationFailureHandler implements AuthenticationFailure
@Override
public void onAuthenticationFailure(HttpServletRequest request, HttpServletResponse response, AuthenticationException exception) throws IOException, ServletException {
log.error("用户登陆认证失败", exception);
- HttpSession httpSession = request.getSession();
- httpSession.setAttribute(KafkaConstants.ERROR_LOGIN, "
Account or password is error .
");
+ HttpSession httpSession = request.getSession(false);
+ httpSession.setAttribute(KafkaConstants.ERROR_DISPLAY, true);
+ httpSession.setAttribute(KafkaConstants.ERROR_LOGIN, "Account or password is error .");
response.sendRedirect(HttpConstants.LOGIN_URL);
}
}
diff --git a/src/main/java/org/smartloli/kafka/eagle/web/security/DefineAuthenticationSuccessHandler.java b/src/main/java/org/smartloli/kafka/eagle/web/security/DefineAuthenticationSuccessHandler.java
index 0bd09892a46b202d7ef06093da6e88082d9cb8b3..49949167455489fb1a7961e98ab63c7a36144fdd 100644
--- a/src/main/java/org/smartloli/kafka/eagle/web/security/DefineAuthenticationSuccessHandler.java
+++ b/src/main/java/org/smartloli/kafka/eagle/web/security/DefineAuthenticationSuccessHandler.java
@@ -5,6 +5,7 @@ import org.smartloli.kafka.eagle.web.config.KafkaClustersConfig;
import org.smartloli.kafka.eagle.web.constant.HttpConstants;
import org.smartloli.kafka.eagle.web.constant.KafkaConstants;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.userdetails.User;
@@ -28,6 +29,9 @@ import java.util.stream.Collectors;
@Slf4j
public class DefineAuthenticationSuccessHandler implements AuthenticationSuccessHandler {
+ @Value("${kafka.eagle.version:}")
+ private String version;
+
@Autowired
private KafkaClustersConfig kafkaClustersConfig;
@@ -38,6 +42,8 @@ public class DefineAuthenticationSuccessHandler implements AuthenticationSuccess
User user = (User) authentication.getPrincipal();
log.info("用户【{}】认证成功", user.getUsername());
HttpSession httpSession = request.getSession();
+ httpSession.setAttribute(KafkaConstants.LOGIN_USER_NAME, user.getUsername());
+ httpSession.setAttribute(KafkaConstants.SYSTEM_VERSION, version);
Collection grantedAuthorityList = (Collection) authentication.getAuthorities();
List grantedAuthorities = grantedAuthorityList.stream().map(GrantedAuthority::getAuthority).collect(Collectors.toList());
@@ -49,21 +55,11 @@ public class DefineAuthenticationSuccessHandler implements AuthenticationSuccess
Object object = httpSession.getAttribute(KafkaConstants.CLUSTER_ALIAS);
if (object == null) {
- List clusterAliasArray = kafkaClustersConfig.getClusterAllAlias();
- String defaultClusterAlias = clusterAliasArray.get(0);
+ List clusterAliasList = kafkaClustersConfig.getClusterAllAlias();
+ String defaultClusterAlias = clusterAliasList.get(0);
httpSession.setAttribute(KafkaConstants.CLUSTER_ALIAS, defaultClusterAlias);
-
- //kafka集群下来列表显示
- StringBuilder dropList = new StringBuilder("");
- httpSession.setAttribute(KafkaConstants.CLUSTER_ALIAS_LIST, dropList.toString());
+ clusterAliasList.remove(defaultClusterAlias);
+ httpSession.setAttribute(KafkaConstants.CLUSTER_ALIAS_LIST, clusterAliasList);
}
response.sendRedirect(HttpConstants.INDEX_URL);
}
diff --git a/src/main/java/org/smartloli/kafka/eagle/web/security/DefineLogoutSuccessHandler.java b/src/main/java/org/smartloli/kafka/eagle/web/security/DefineLogoutSuccessHandler.java
index 02025a1f5d73e0eea4b6dbb53f2ca34be5f8f3b4..cf1cb6d329cbf14c3711855d371b814da0379e89 100644
--- a/src/main/java/org/smartloli/kafka/eagle/web/security/DefineLogoutSuccessHandler.java
+++ b/src/main/java/org/smartloli/kafka/eagle/web/security/DefineLogoutSuccessHandler.java
@@ -7,7 +7,6 @@ import org.springframework.security.core.userdetails.User;
import org.springframework.security.web.authentication.logout.LogoutSuccessHandler;
import org.springframework.stereotype.Component;
-import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
@@ -23,8 +22,10 @@ import java.io.IOException;
@Slf4j
public class DefineLogoutSuccessHandler implements LogoutSuccessHandler {
@Override
- public void onLogoutSuccess(HttpServletRequest request, HttpServletResponse response, Authentication authentication) throws IOException, ServletException {
- log.info("用户【{}】退出登陆成功", ((User)authentication.getPrincipal()).getUsername());
+ public void onLogoutSuccess(HttpServletRequest request, HttpServletResponse response, Authentication authentication) throws IOException {
+ if(null != authentication.getPrincipal()) {
+ log.info("用户【{}】退出登陆成功", ((User) authentication.getPrincipal()).getUsername());
+ }
request.getSession().invalidate();
response.sendRedirect(HttpConstants.LOGIN_URL);
}
diff --git a/src/main/java/org/smartloli/kafka/eagle/web/security/KafkaEagleUserDetailsService.java b/src/main/java/org/smartloli/kafka/eagle/web/security/KafkaEagleUserDetailsService.java
index e1e27dce8f14e80e36cbc566d9dd35ca9196cfd4..53bc4959203cae37da40ea19edbb3a8db6244a40 100644
--- a/src/main/java/org/smartloli/kafka/eagle/web/security/KafkaEagleUserDetailsService.java
+++ b/src/main/java/org/smartloli/kafka/eagle/web/security/KafkaEagleUserDetailsService.java
@@ -11,11 +11,11 @@ import org.springframework.security.core.userdetails.User;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.security.core.userdetails.UserDetailsService;
import org.springframework.security.core.userdetails.UsernameNotFoundException;
-import org.springframework.security.crypto.password.PasswordEncoder;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
+import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@@ -31,9 +31,6 @@ public class KafkaEagleUserDetailsService implements UserDetailsService {
@Autowired
private UserInfoService userInfoService;
- @Autowired
- private PasswordEncoder passwordEncoder;
-
/**
* 获取用户名绑定用户信息
* @param username 用户名
@@ -47,6 +44,8 @@ public class KafkaEagleUserDetailsService implements UserDetailsService {
User.UserBuilder userBuilder = User.builder().username(username);
UserInfo userInfo = userInfoService.queryUserByName(username);
if(null == userInfo){
+ userBuilder.password(null);
+ userBuilder.authorities(Collections.emptyList());
return userBuilder.build();
}
diff --git a/src/main/java/org/smartloli/kafka/eagle/web/service/BrokerService.java b/src/main/java/org/smartloli/kafka/eagle/web/service/BrokerService.java
index 8d05e7301c6bb4afc536177330ff5a3f186df6f7..8ac9f49d5e444e85f833b13850977903a8b7147e 100644
--- a/src/main/java/org/smartloli/kafka/eagle/web/service/BrokerService.java
+++ b/src/main/java/org/smartloli/kafka/eagle/web/service/BrokerService.java
@@ -63,10 +63,20 @@ public interface BrokerService {
/** Get topic producer logsize total. */
long getTopicLogSizeTotal(String clusterAlias, String topic);
- /** Get topic real logsize records. */
+ /**
+ * 获取真实Kafka主题日志数据量
+ * @param clusterAlias kafka集群名称
+ * @param topic 主题名称
+ * @return
+ */
long getTopicRealLogSize(String clusterAlias, String topic);
- /** Get topic producer send logsize records. */
+ /**
+ * 获取kafka集群生产者发送主题日志数量
+ * @param clusterAlias kafka集群名称
+ * @param topic 主题名称
+ * @return
+ */
long getTopicProducerLogSize(String clusterAlias, String topic);
/** Add topic partitions. */
diff --git a/src/main/java/org/smartloli/kafka/eagle/web/service/ConsumerService.java b/src/main/java/org/smartloli/kafka/eagle/web/service/ConsumerService.java
index 3233198a9a568dfc4016c31bc030abe32a37ca53..f435400e6dc3e5e06b06f474d88d885fcfb56b60 100644
--- a/src/main/java/org/smartloli/kafka/eagle/web/service/ConsumerService.java
+++ b/src/main/java/org/smartloli/kafka/eagle/web/service/ConsumerService.java
@@ -18,6 +18,10 @@
package org.smartloli.kafka.eagle.web.service;
import org.smartloli.kafka.eagle.web.protocol.DisplayInfo;
+import org.smartloli.kafka.eagle.web.protocol.TopicConsumerInfo;
+
+import java.util.List;
+import java.util.Map;
/**
* Kafka consumer data interface.
@@ -30,6 +34,15 @@ import org.smartloli.kafka.eagle.web.protocol.DisplayInfo;
*/
public interface ConsumerService {
+ /**
+ * 获取消费者信息
+ * @param clusterAlias kafka集群名称
+ */
+ Map> getConsumers(String clusterAlias);
+
+ /** Get kafka 0.10.x consumer group & topic information. */
+ String getKafkaConsumer(String clusterAlias);
+
/**
* Get active topic graph data interface.
*/
@@ -44,7 +57,7 @@ public interface ConsumerService {
* Judge consumer detail information storage offset in kafka or zookeeper
* interface.
*/
- String getConsumerDetail(String clusterAlias, String formatter, String group);
+ List getConsumerDetail(String clusterAlias, String formatter, String group);
/**
* Judge consumers storage offset in kafka or zookeeper interface.
@@ -71,7 +84,6 @@ public interface ConsumerService {
*/
int isConsumering(String clusterAlias, String group, String topic);
- /** DB storage */
/** Get consumer group count by db. */
long getConsumerCountByDB(String clusterAlias);
diff --git a/src/main/java/org/smartloli/kafka/eagle/web/service/KafkaService.java b/src/main/java/org/smartloli/kafka/eagle/web/service/KafkaService.java
index 52680b14da2a2b9930b60322fb515af3e0cb6823..034776cbad13254243519716dcb6af46dbc1dcd1 100644
--- a/src/main/java/org/smartloli/kafka/eagle/web/service/KafkaService.java
+++ b/src/main/java/org/smartloli/kafka/eagle/web/service/KafkaService.java
@@ -17,6 +17,7 @@
*/
package org.smartloli.kafka.eagle.web.service;
+import com.alibaba.fastjson.JSONArray;
import org.apache.kafka.common.TopicPartition;
import org.smartloli.kafka.eagle.web.protocol.*;
@@ -26,48 +27,57 @@ import java.util.Set;
/**
* Kafka group,topic and partition interface.
- *
* @author smartloli.
- *
- * Created by Jan 18, 2017
- *
- * Update by hexiang 20170216
+ * Created by Jan 18, 2017
+ * Update by hexiang 20170216
*/
public interface KafkaService {
- /** Find topic and group exist in zookeeper. */
- boolean findTopicAndGroupExist(String clusterAlias, String topic, String group);
+ /**
+ * 查询消费组是否消费指定主题
+ * @param clusterAlias kafka集群名称
+ * @param topic 主题名称
+ * @param consumerGroup 消费组名称
+ * @return
+ */
+ boolean findTopicExistInGroup(String clusterAlias, String topic, String consumerGroup);
/**
- * Obtaining metadata in zookeeper by topic.
+ * 获取主题分区信息
+ * @param clusterAlias 集群名
+ * @param topic 主题名称
+ * @return List.
*/
List findTopicPartition(String clusterAlias, String topic);
/**
- * Get kafka active consumer topic.
+ * 获取活跃kafka消费者主题信息
+ * @param clusterName kafka系群名
*/
- Map> getActiveTopic(String clusterAlias);
+ Map> findActiveTopics(String clusterName);
/**
- * Get kafka active consumer topic.
+ * 获取kafka集群消费组订阅的活跃主题信息
+ * @param clusterAlias kafka集群名称
+ * @param consumerGroup 消费组
+ * @return
*/
- Set getActiveTopic(String clusterAlias, String group);
+ Set findActiveTopics(String clusterAlias, String consumerGroup);
/**
* 通过别名获取Kafka 代理节点信息
*
- * @param clusterName kafka集群列表
+ * @param clusterName kafka集群名称
* @return
*/
List getBrokerInfos(String clusterName);
/**
* 通过别名获取Kafka 代理节点信息
- *
* @param clusterNames kafka集群列表
* @return
*/
- Map> getAllBrokerInfos(List clusterNames);
+ Map> getBrokerInfos(List clusterNames);
/**
* Get broker host info from ids.
@@ -75,17 +85,22 @@ public interface KafkaService {
String getBrokerJMXFromIds(String clusterAlias, int ids);
/**
- * Obtaining kafka consumer information from zookeeper.
+ * 从Zookeeper获取分页消费者信息
+ *
+ * @param clusterAlias kafka集群名称
+ * @param displayInfo 分页请求
*/
- Map> getConsumers(String clusterAlias);
+ Map> getConsumers(String clusterAlias, DisplayInfo displayInfo);
/**
- * Obtaining kafka consumer page information from zookeeper.
+ * 获取消费组监听主题的偏移量
+ * @param clusterAlias kafka集群名称
+ * @param topic 主题名称
+ * @param group 消费组名
+ * @param partition 分区号
+ * @return
*/
- Map> getConsumers(String clusterAlias, DisplayInfo page);
-
- /** According to group, topic and partition to get offset from zookeeper. */
- OffsetZkInfo getOffset(String clusterAlias, String topic, String group, int partition);
+ OffsetZkInfo getGroupTopicPartitionOffset(String clusterAlias, String topic, String group, int partition);
/** Get kafka 0.10.x offset from topic. */
String getKafkaOffset(String clusterAlias);
@@ -93,12 +108,6 @@ public interface KafkaService {
/** Get the data for the topic partition in the specified consumer group */
Map getKafkaOffset(String clusterAlias, String group, String topic, Set partitionids);
- /** Use kafka console comand to create topic. */
- Map create(String clusterAlias, String topicName, String partitions, String replic);
-
- /** Use kafka console command to delete topic. */
- Map delete(String clusterAlias, String topicName);
-
/**
* 解析集群代理服务器信息
* @param clusterAlias kafka集群名称
@@ -114,18 +123,23 @@ public interface KafkaService {
/** Get kafka 0.10.x consumer topic, maybe consumer topic owner is null. */
Set getKafkaConsumerTopics(String clusterAlias, String group);
-
- /** Get kafka 0.10.x consumer group & topic information. */
- String getKafkaConsumer(String clusterAlias);
/** Get kafka 0.10.x consumer group & topic information used for page. */
String getKafkaConsumer(String clusterAlias, DisplayInfo displayInfo);
+ /**
+ * 获取Kafka元数据信息
+ * @param clusterAlias kafka集群名称
+ * @param group 消费组名称
+ * @return
+ */
+ JSONArray getKafkaMetadata(String clusterAlias, String group);
+
@Deprecated
/** Get kafka consumer information pages. */ String getKafkaActiverSize(String clusterAlias, String group);
/** Get kafka consumer information pages not owners. */
- OwnerInfo getKafkaActiverNotOwners(String clusterAlias, String group);
+ OwnerInfo getKafkaActiveNotOwners(String clusterAlias, String group);
/** Get kafka broker bootstrap server. */
String getKafkaBrokerServer(String clusterAlias);
@@ -140,16 +154,16 @@ public interface KafkaService {
long getKafkaLogSize(String clusterAlias, String topic, int partitionid);
/** Get kafka topic history batch logsize. */
- Map getKafkaLogSize(String clusterAlias, String topic, Set partitionids);
+ Map getKafkaLogSize(String clusterAlias, String topic, Set partitionIds);
/** Get kafka topic real logsize by partitionid. */
- long getKafkaRealLogSize(String clusterAlias, String topic, int partitionid);
+ long getKafkaRealLogSize(String clusterAlias, String topic, int partitionId);
/** Get kafka topic real logsize by partitionid set. */
- long getKafkaRealLogSize(String clusterAlias, String topic, Set partitionids);
+ long getKafkaRealLogSize(String clusterAlias, String topic, Set partitionIds);
/** Get topic producer send logsize records. */
- long getKafkaProducerLogSize(String clusterAlias, String topic, Set partitionids);
+ long getKafkaProducerLogSize(String clusterAlias, String topic, Set partitionIds);
/** Get kafka sasl topic metadate. */
List findKafkaLeader(String clusterAlias, String topic);
@@ -173,9 +187,13 @@ public interface KafkaService {
long getRealLogSize(String clusterAlias, String topic, int partitionid);
/**
- * Get topic metadata.
+ * 获取主题分区的复制分区编号信息
+ * @param clusterAlias kafka集群信息
+ * @param topic 主题名称
+ * @param partitionId 分区id
+ * @return
*/
- String getReplicasIsr(String clusterAlias, String topic, int partitionid);
+ String getTopicPartitionReplicas(String clusterAlias, String topic, int partitionId);
/**
* Get kafka version.
diff --git a/src/main/java/org/smartloli/kafka/eagle/web/service/MetricsService.java b/src/main/java/org/smartloli/kafka/eagle/web/service/MetricsService.java
index c643d4f45b50820fc95cd8cae6938e40354d5a0b..206d25bebb2b6e4af694401b2ece2a8a002e839b 100644
--- a/src/main/java/org/smartloli/kafka/eagle/web/service/MetricsService.java
+++ b/src/main/java/org/smartloli/kafka/eagle/web/service/MetricsService.java
@@ -36,7 +36,9 @@ import java.util.Map;
public interface MetricsService {
/**
- * Gets summary monitoring data for all broker.
+ * 获取Kafka集群所有代理节点监控数据
+ * @param clusterAlias kafka集群名称
+ * @return
*/
String getAllBrokersMBean(String clusterAlias);
diff --git a/src/main/java/org/smartloli/kafka/eagle/web/service/TopicService.java b/src/main/java/org/smartloli/kafka/eagle/web/service/TopicService.java
index 48f6f48fd79ae13d82159aabd99e349244dfa88d..ec61d8915ff652231d5e158d238ce5691910ed4e 100644
--- a/src/main/java/org/smartloli/kafka/eagle/web/service/TopicService.java
+++ b/src/main/java/org/smartloli/kafka/eagle/web/service/TopicService.java
@@ -34,6 +34,24 @@ import java.util.Map;
*/
public interface TopicService {
+ /**
+ * 创建主题
+ * @param clusterAlias kafka集群名称
+ * @param topicName 主题名称
+ * @param partitions 分区数
+ * @param replica 复制分区数
+ * @return
+ */
+ Map createTopic(String clusterAlias, String topicName, String partitions, String replica);
+
+ /**
+ * 删除主题
+ * @param clusterAlias kafka集群名称
+ * @param topicName 主题名称
+ * @return
+ */
+ Map deleteTopic(String clusterAlias, String topicName);
+
/**
* Find topic name in all topics.
*/
diff --git a/src/main/java/org/smartloli/kafka/eagle/web/service/impl/AlertServiceImpl.java b/src/main/java/org/smartloli/kafka/eagle/web/service/impl/AlertServiceImpl.java
index a03492e0b1a5cb16128ebb753b0a04ae21939294..ba508015952409771ddda11698db7f440e77870b 100644
--- a/src/main/java/org/smartloli/kafka/eagle/web/service/impl/AlertServiceImpl.java
+++ b/src/main/java/org/smartloli/kafka/eagle/web/service/impl/AlertServiceImpl.java
@@ -28,6 +28,7 @@ import org.smartloli.kafka.eagle.web.protocol.alarm.AlarmConfigInfo;
import org.smartloli.kafka.eagle.web.protocol.alarm.AlarmConsumerInfo;
import org.smartloli.kafka.eagle.web.protocol.topic.TopicLogSize;
import org.smartloli.kafka.eagle.web.service.AlertService;
+import org.smartloli.kafka.eagle.web.service.ConsumerService;
import org.smartloli.kafka.eagle.web.service.KafkaService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -49,10 +50,12 @@ import java.util.regex.Pattern;
@Service
public class AlertServiceImpl implements AlertService {
- /** Kafka service interface. */
@Autowired
private KafkaService kafkaService;
+ @Autowired
+ private ConsumerService consumerService;
+
@Autowired
private AlertDao alertDao;
@@ -83,7 +86,7 @@ public class AlertServiceImpl implements AlertService {
}
private String getAlarmConsumerGroup(String clusterAlias, String search) {
- Map> consumers = kafkaService.getConsumers(clusterAlias);
+ Map> consumers = consumerService.getConsumers(clusterAlias);
JSONArray groups = new JSONArray();
int offset = 0;
if (search.length() > 0) {
@@ -111,7 +114,7 @@ public class AlertServiceImpl implements AlertService {
private String getAlarmConsumerGroupKafka(String clusterAlias, String search) {
int offset = 0;
JSONArray groups = new JSONArray();
- JSONArray consumerGroups = JSON.parseArray(kafkaService.getKafkaConsumer(clusterAlias));
+ JSONArray consumerGroups = JSON.parseArray(consumerService.getKafkaConsumer(clusterAlias));
if (search.length() > 0) {
for (Object object : consumerGroups) {
JSONObject consumerGroup = (JSONObject) object;
@@ -137,7 +140,7 @@ public class AlertServiceImpl implements AlertService {
}
private String getAlarmConsumerTopic(String clusterAlias, String group, String search) {
- Map> consumers = kafkaService.getConsumers(clusterAlias);
+ Map> consumers = consumerService.getConsumers(clusterAlias);
JSONArray topics = new JSONArray();
int offset = 0;
if (search.length() > 0) {
diff --git a/src/main/java/org/smartloli/kafka/eagle/web/service/impl/BrokerServiceImpl.java b/src/main/java/org/smartloli/kafka/eagle/web/service/impl/BrokerServiceImpl.java
index 53c0bfa160c184e474df5d8a10d795a390a77b09..300b7c49fafcb9515270cf1da566aa60bb4a2083 100644
--- a/src/main/java/org/smartloli/kafka/eagle/web/service/impl/BrokerServiceImpl.java
+++ b/src/main/java/org/smartloli/kafka/eagle/web/service/impl/BrokerServiceImpl.java
@@ -432,7 +432,7 @@ public class BrokerServiceImpl implements BrokerService {
metadate.setIsr(topicMetadata.getString("isr"));
metadate.setLeader(topicMetadata.getInteger("leader"));
metadate.setPartitionId(partition);
- metadate.setReplicas(kafkaService.getReplicasIsr(clusterAlias, topic, partition));
+ metadate.setReplicas(kafkaService.getTopicPartitionReplicas(clusterAlias, topic, partition));
long logSize = 0L;
if ("kafka".equals(kafkaClustersConfig.getClusterConfigByName(clusterAlias).getOffsetStorage())) {
logSize = kafkaService.getKafkaRealLogSize(clusterAlias, topic, partition);
@@ -493,7 +493,7 @@ public class BrokerServiceImpl implements BrokerService {
metadate.setIsr(topicMetadata.getString("isr"));
metadate.setLeader(topicMetadata.getInteger("leader"));
metadate.setPartitionId(Integer.valueOf(partition));
- metadate.setReplicas(kafkaService.getReplicasIsr(clusterAlias, topic, Integer.valueOf(partition)));
+ metadate.setReplicas(kafkaService.getTopicPartitionReplicas(clusterAlias, topic, Integer.valueOf(partition)));
targets.add(metadate);
}
}
@@ -547,7 +547,6 @@ public class BrokerServiceImpl implements BrokerService {
return logSize;
}
- /** Get topic real logsize records. */
@Override
public long getTopicRealLogSize(String clusterAlias, String topic) {
long logSize = 0L;
@@ -589,10 +588,10 @@ public class BrokerServiceImpl implements BrokerService {
if (KafkaConstants.CONSUMER_OFFSET_TOPIC.equals(topic)) {
return logSize;
}
- KafkaZkClient zkc = KafkaResourcePoolUtils.getZookeeperClient(clusterAlias);
+ KafkaZkClient zookeeperClient = KafkaResourcePoolUtils.getZookeeperClient(clusterAlias);
try {
- if (zkc.pathExists(BROKER_TOPICS_PATH + "/" + topic)) {
- Tuple2