企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] # 老API ~~~ import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class OldProducer { @SuppressWarnings("deprecation") public static void main(String[] args) { Properties properties = new Properties(); properties.put("metadata.broker.list", "master:9092"); properties.put("request.required.acks", "1"); properties.put("serializer.class", "kafka.serializer.StringEncoder"); Producer<Integer, String> producer = new Producer<Integer,String>(new ProducerConfig(properties)); KeyedMessage<Integer, String> message = new KeyedMessage<Integer, String>("first", "hello world"); producer.send(message ); } } ~~~ # 新API ## 创建生产者 ~~~ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; //生产者,这个api是只管发 public class CustomProducer { public static void main(String[] args) { Properties props = new Properties(); //kafka服务端的主机名和端口号 props.put("bootstrap.servers", "master:9092"); //等待所有副本节点的应答,如果你想3个副本应答就可以的话,这边也可以写3 props.put("acks", "all"); //消息发送最大尝试次数 props.put("retries", 0); //一批消息处理大小,批量大小,有点缓存的意思 props.put("batch.size", 16384); //请求延时,一直请求 props.put("linger.ms", 1); //发送缓存区内存大小,攒到多少才写到磁盘上 props.put("buffer.memory", 33554432); //key序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value序列化 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //可以在new producer之前添加自定义拦截器 //ArrayList<Object> arrayList = new ArrayList<>(); //arrayList.add("com.jdxia.interceptor.TimeInterceptor"); //arrayList.add("com.jdxia.interceptor.CounterInterceptor"); //props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, arrayList); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); for (int i = 0; i < 50; i++) { //发送数据,消息发送到test2这个主题,后面参数是key和value producer.send(new ProducerRecord<>("test1", Integer.toString(i), "hello world-" + i)); //还可以指定分区,这边是指定第0个分区 //producer.send(new ProducerRecord<String, String>("test2", 0 ,Integer.toString(i), "hello world-" + i)); } producer.close(); } } ~~~ ## 创建生产者(新API) ~~~ import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; //生产者 public class CallBackProducer { public static void main(String[] args) { Properties props = new Properties(); //kafka服务端的主机名和端口号 props.put("bootstrap.servers", "master:9092"); /* * request.required.acks,设置发送数据是否需要服务端的反馈,有三个值0,1,-1 * 0,意味着producer永远不会等待一个来自broker的ack,这就是0.7版本的行为。 * 这个选项提供了最低的延迟,但是持久化的保证是最弱的,当server挂掉的时候会丢失一些数据。 * 1,意味着在leader replica已经接收到数据后,producer会得到一个ack。 * 这个选项提供了更好的持久性,因为在server确认请求成功处理后,client才会返回。 * 如果刚写到leader上,还没来得及复制leader就挂了,那么消息才可能会丢失。 * -1,意味着在所有的ISR都接收到数据后,producer才得到一个ack。 * 这个选项提供了最好的持久性,只要还有一个replica存活,那么数据就不会丢失 */ //props.put("request.required.acks", "1"); //等待所有副本节点的应答,如果你想3个副本应答就可以的话,这边也可以写3 props.put("acks", "all"); //消息发送最大尝试次数 props.put("retries", 0); //一批消息处理大小,批量大小,有点缓存的意思 props.put("batch.size", 16384); //请求延时,一直请求 props.put("linger.ms", 1); //发送缓存区内存大小,攒到多少才写到磁盘上 props.put("buffer.memory", 33554432); //key序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value序列化 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //自定义分区,可以指定自定义分区,也可以不指定 //props.put("partitioner.class", "com.jdxia.kafka.CustomPartitioner"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props); for (int i = 0; i < 50; i++) { //也可以指定分区h和key,这边分区是第0个分区,key是aaa //kafkaProducer.send(new ProducerRecord<String, String>("test1", 0, "aaa", "hello" + i), new Callback() { //test1是topic,后面是发送的消息 kafkaProducer.send(new ProducerRecord<String, String>("test1", "hello" + i), new Callback() { //发送完成的方法,如果有异常可以拿到异常的 @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (recordMetadata != null) { //打印发送这条数据,在哪个分区和他的偏移量 System.out.printf(recordMetadata.partition() + "---" + recordMetadata.offset()); } } }); } kafkaProducer.close(); } } ~~~