# kfk-client **Repository Path**: zwhx/kfk-client ## Basic Information - **Project Name**: kfk-client - **Description**: No description available - **Primary Language**: Unknown - **License**: LGPL-3.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2020-07-06 - **Last Updated**: 2020-12-18 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README 简化kafka client操作 # 目标 使用约定和一些设计模式来简化kafka client操作 # 例子 ## Producer ```java class Demo{ public static void demo() { MessageProducer producer = new MessageProducer("server1:9092,server2:9092", "test"); producer.sendMessage("hello", "hello", resp -> { logger.info(resp.toString()); }); } } ``` ## Consumer ```java class Demo { public static void demo() { String servers = "server1:9092,server2:9092"; String topic = "test"; String groupId = "test"; logger.info("starting test consumer, servers: {}, topics: {}", servers, topic); MessageConsumer consumer = new MessageConsumer( servers, groupId, false, Collections.singletonList(topic)); MessageConsumer consumer1 = new MessageConsumer( servers, groupId, false, Collections.singletonList(topic) ); Consumer> consumerRecordsConsumer = rs -> { Iterable> records = rs.records(topic); for (ConsumerRecord record : records) { logger.info("receive message, key: {}, value: {}, partition: {}, timestamp: {}, offset: {}", record.key(), record.value(), record.partition(), record.timestamp(), record.offset() ); } }; consumer.listening(consumerRecordsConsumer); MessageProducer producer = new MessageProducer(servers, topic); HashMap data = new HashMap<>(); data.put("1","2"); data.put("3","4"); data.put("5","6"); producer.sendMessage( "msg key 1", data, r -> logger.info("send message, offset: {}, timestamp: {}, partition: {}", r.offset(), r.timestamp(), r.partition()) ); logger.info("message receive test completed, servers: {}, topics: {}", servers, topic); sleep(5000); logger.info("consumer suspend test start, servers: {}, topics: {}", servers, topic); consumer.suspend(); logger.info("consumer suspend test completed, servers: {}, topics: {}", servers, topic); sleep(5000); logger.info("consumer terminal test start, servers: {}, topics: {}", servers, topic); consumer.terminal(); logger.info("consumer terminal test completed, servers: {}, topics: {}", servers, topic); sleep(5000); logger.info("new consumer join test start, servers: {}, topics: {}", servers, topic); consumer1.listening(consumerRecordsConsumer); data.put("7", "7"); producer.sendMessage("msg key 2", data); logger.info("new consumer join test completed, servers: {}, topics: {}", servers, topic); sleep(5000); } } ```