Kafka-08-Producer-09以前版本适用

36次阅读

共计 1132 个字符,预计需要花费 3 分钟才能阅读完成。

Kafka 旧版本 producer 由 scala 编写,0.9 以后已经废除

示例代码如下:

import kafka.producer.KeyedMessage;
import kafka.javaapi.producer.Producer;
import kafka.producer.ProducerConfig;
import java.util.Properties;

public class ProducerDemo {public static void main(String[] args) {Properties properties = new Properties();
        properties.put("metadata.broker.list", "kafka01:9092,kafka02:9092");
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        properties.put("request.requird.acks", "1");
        ProducerConfig config = new ProducerConfig(properties);
        Producer<String, String> producer = new Producer<String, String>(config);
        KeyedMessage<String,String> msg = new KeyedMessage<String,String>("topic","key","hello");
        producer.send(msg);
    }
    
}

自定义 partition 示例代码如下:

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class SimplePartitioner implements Partitioner {public SimplePartitioner (VerifiableProperties props) { }

    public int partition(Object key, int a_numPartitions) {
        int partition = 0;
        String stringKey = (String) key;
        int offset = stringKey.lastIndexOf('.');
        if (offset > 0) {partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;
        }
        return partition;
    }

}

更多实时计算,Kafka 等相关技术博文,欢迎关注实时流式计算

正文完
 0