kafka-console-producer.sh --broker-list 192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094 --topic test1
④消费消息
我们也还是先来看看 kafka-console-consumer.sh 的参数吧:
- --topic <String: topic>:指定 topic
- --group <String: consumer group id>:指定消费者组
- --from-beginning:指定从开始进行消费, 如果不指定, 就从当前进行消费
- --bootstrap-server:Kafka 的连接地址
kafka-console-consumer.sh --bootstrap-server 192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094 --topic test1 ---beginning Kafka 的日志
Kafka 的日志分两种:
- 第一种日志是我们的 Kafka 的启动日志,就是我们排查问题,查看报错信息的日志 。
- 第二种日志就是我们的数据日志,Kafka 是我们的数据是以日志的形式存在存盘中的,我们第二种所说的日志就是我们的 Partiton 与 Segment 。
那我们就来说说备份和分区吧:我们创建一个分区,一个备份,那么 test 就应该在三台机器上或者三个数据目录只有一个 test-0 。(分区的下标是从 0 开始的)
如果我们创建 N 个分区,我们就会在三个服务器上发现,test_0-n,如果我们创建 M 个备份,我们就会在发现,test_0 到 test_n 每一个都是 M 个 。
Kafka API
使用 Kafka 原生的 API
①消费者自动提交
定义自己的生产者:
import org.apache.kafka.clients.producer.Callback;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Properties;/** * @ClassName MyKafkaProducer * @Description TODO * @Author lingxiangxiang * @Date 3:37 PM * @Version 1.0 **/public class MyKafkaProducer { private org.apache.kafka.clients.producer.KafkaProducer<Integer, String> producer; public MyKafkaProducer() { Properties properties = new Properties(); properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094"); properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 设置批量发送 properties.put("batch.size", 16384); // 批量发送的等待时间50ms, 超过50ms, 不足批量大小也发送 properties.put("linger.ms", 50); this.producer = new org.apache.kafka.clients.producer.KafkaProducer<Integer, String>(properties); } public boolean sendMsg() { boolean result = true; try { // 正常发送, test2是topic, 0代表的是分区, 1代表的是key, hello world是发送的消息内容 final ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("test2", 0, 1, "hello world"); producer.send(record); // 有回调函数的调用 producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { System.out.println(recordMetadata.topic()); System.out.println(recordMetadata.partition()); System.out.println(recordMetadata.offset()); } }); // 自己定义一个类 producer.send(record, new MyCallback(record)); } catch (Exception e) { result = false; } return result; }}
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 带你了解黑客使用的命令行浏览器,令人吃惊
- 10分钟教你Python+MySQL数据库操作
- 机架式服务器的详解,一文带你读懂
- 一文带你了解IPsec VPN基本原理与配置流程,干货值得收藏
- 带你了解香港服务器和美国服务器的区别
- 10分钟将你的Go工程转换为Go Module模式
- 带你了解太极拳的健身原理是什么
- 黄瓜|春天减肥,试试这5道“刮油菜”,10分钟端上桌,好吃解腻
- 齐刘海|素颜也好看的女生是什么样子的?女生护肤?带你get护肤小步骤!
- “疙瘩汤”的这个做法火了,10分钟出锅,好吃解馋,怎么也吃不够
