10分钟带你逆袭Kafka!( 九 )


 
kafka-console-producer.sh --broker-list 192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094 --topic test1 
④消费消息
 
我们也还是先来看看 kafka-console-consumer.sh 的参数吧:
 

  • --topic <String: topic>:指定 topic
  • --group <String: consumer group id>:指定消费者组
  • --from-beginning:指定从开始进行消费, 如果不指定, 就从当前进行消费
  • --bootstrap-server:Kafka 的连接地址
kafka-console-consumer.sh --bootstrap-server 192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094 --topic test1 ---beginning 
Kafka 的日志
 
Kafka 的日志分两种:
 
  • 第一种日志是我们的 Kafka 的启动日志,就是我们排查问题,查看报错信息的日志 。
  • 第二种日志就是我们的数据日志,Kafka 是我们的数据是以日志的形式存在存盘中的,我们第二种所说的日志就是我们的 Partiton 与 Segment 。
 
那我们就来说说备份和分区吧:我们创建一个分区,一个备份,那么 test 就应该在三台机器上或者三个数据目录只有一个 test-0 。(分区的下标是从 0 开始的)
 
如果我们创建 N 个分区,我们就会在三个服务器上发现,test_0-n,如果我们创建 M 个备份,我们就会在发现,test_0 到 test_n 每一个都是 M 个 。
 
  
Kafka API
 
使用 Kafka 原生的 API
 
①消费者自动提交 
定义自己的生产者:
 
import org.apache.kafka.clients.producer.Callback;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Properties;/** * @ClassName MyKafkaProducer * @Description TODO * @Author lingxiangxiang * @Date 3:37 PM * @Version 1.0 **/public class MyKafkaProducer {    private org.apache.kafka.clients.producer.KafkaProducer<Integer, String> producer;    public MyKafkaProducer() {        Properties properties = new Properties();        properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094");        properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");        // 设置批量发送        properties.put("batch.size", 16384);        // 批量发送的等待时间50ms, 超过50ms, 不足批量大小也发送        properties.put("linger.ms", 50);        this.producer = new org.apache.kafka.clients.producer.KafkaProducer<Integer, String>(properties);    }    public boolean sendMsg() {        boolean result = true;        try {            // 正常发送, test2是topic, 0代表的是分区, 1代表的是key, hello world是发送的消息内容            final ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("test2", 0, 1, "hello world");            producer.send(record);            // 有回调函数的调用            producer.send(record, new Callback() {                @Override                public void onCompletion(RecordMetadata recordMetadata, Exception e) {                    System.out.println(recordMetadata.topic());                    System.out.println(recordMetadata.partition());                    System.out.println(recordMetadata.offset());                }            });          // 自己定义一个类            producer.send(record, new MyCallback(record));        } catch (Exception e) {            result = false;        }        return result;    }}


推荐阅读