PRODUCERS
Let’s use the following producer example:
package javiermugueta.blog; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class Producer { static String bootstrapServers = "c....oraclecloud.com:9092"; static String tenancyName = "x......k"; static String username = "oracleidentitycloudservice/F....s"; static String streamPoolId = "ocid1.streampool.oc1.eu-frankf...q"; static String authToken = "6...V"; static String streamOrKafkaTopicName = "a....l"; static String proto = "SASL_SSL"; private static Properties getKafkaProperties() { Properties properties = new Properties(); properties.put("bootstrap.servers", bootstrapServers); properties.put("security.protocol", proto); properties.put("sasl.mechanism", "PLAIN"); //properties.put("ssl.endpoint.identification.algorithm", ""); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); final String value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + tenancyName + "/" + username + "/" + streamPoolId + "\" " + "password=\"" + authToken + "\";"; properties.put("sasl.jaas.config", value); properties.put("retries", 3); // retries on transient errors and load balancing disconnection properties.put("max.request.size", 1024 * 1024); // limit request size to 1MB return properties; } public static void main(String args[]) { try { Properties properties = getKafkaProperties(); KafkaProducer producer = new KafkaProducer<>(properties); for(int i=0;i<100;i++) { ProducerRecord<String, String> record = new ProducerRecord<>(streamOrKafkaTopicName, "messageKey" + i, "{ a=" +i +", b=" + 2*i + ", c=" + 3*i +" }" ); producer.send(record, (md, ex) -> { if (ex != null) { System.err.println("exception occurred in producer for review :" + record.value() + ", exception is " + ex); ex.printStackTrace(); } else { System.err.println("Sent msg to " + md.partition() + " with offset " + md.offset() + " at " + md.timestamp()); } }); } // producer.send() is async, to make sure all messages are sent we use producer.flush() producer.flush(); producer.close(); } catch (Exception e) { System.err.println("Error: exception " + e); } } }
1 – org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed
tenancyName adn/or username and/or streamPoolId and/or authToken are not correct
2 – org.apache.kafka.common.errors.TimeoutException: Topic XXXXXX not present in metadata after MMMM ms
bootstrapServers and/or streamOrKafkaTopicName are not correct
When “security.protocol” properrty is not “SASL_SSL”
When “sasl.mechanism” property is not “PLAIN“
3 – org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: No subject alternative names matching IP address xxx.yyy.zzz.ttt found
Use appropriate DNS configuratiion or set:
properties.put("ssl.endpoint.identification.algorithm", "");
4 – exception occurred in producer …, exception is org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition
Incorrect or insufficient privileges (policies):
allow any-user to use stream-family in compartment xploraDEV where any {request.user.id='....', request.user.id='...' } --- allow any-user to {STREAM_READ, STREAM_CONSUME} in compartment xploraDEV where any {request.user.id='...', request.user.id='...' } (*) or {request.groups.id='...'}
NOTE: Policies can also be restricted with policy sintax like this and this
CONSUMERS
Let’s use the following consumer example:
package javiermugueta.blog; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class Consumer { static String bootstrapServers = "138.1.1.194:9092"; //"cell-1.streaming.eu-frankfurt-1.oci.oraclecloud.com:9092"; static String tenancyName = "x....k"; static String username = "oracleidentitycloudservice/F...s"; static String streamPoolId = "ocid1.streampool.oc1.e...a"; static String authToken = "6...V"; static String streamOrKafkaTopicName = "test"; static String consumerGroupName = "G00001"; static String proto = "SASL_SSL"; private static Properties getKafkaProperties(){ Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServers); props.put("group.id", consumerGroupName); props.put("security.protocol", proto); props.put("sasl.mechanism", "PLAIN"); props.put("ssl.endpoint.identification.algorithm", ""); props.put("enable.auto.commit", "false"); props.put("session.timeout.ms", "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); final String value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + tenancyName + "/" + username + "/" + streamPoolId + "\" " + "password=\"" + authToken + "\";"; props.put("sasl.jaas.config", value); return props; } public static void main(String[] args) { final KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(getKafkaProperties());; consumer.subscribe(Collections.singletonList(streamOrKafkaTopicName)); ConsumerRecords<Integer, String> records = consumer.poll(10000); System.out.println("size of records polled is "+ records.count()); for (ConsumerRecord<Integer, String> record : records) { System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); } consumer.commitSync(); consumer.close(); } }
1 – ERRORS 1, 2, 3 AND 4 EXPLAINED IN THE PREVIOUS SECTION
2 – NO ERROR RAISED AT ALL BUT NO MESSAGE RETRIEVED AND YOU ARE 100% CONFIDENT THAT MESSAGES WHERE PUBLISHED TO THE TOPIC (size of records polled is 0)
If the topicname string you are using doesn’t exist, the client is not raising any error, therefore double check you are using the correct topicname.
That’s all, hope iit helps!!
With Kafka latest library (idempotence default changed to true) you need also to add :
properties.put(“enable.idempotence”, false);
LikeLike
Else you get an error like this:
org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support INIT_PRODUCER_ID
LikeLike
I have an error:
org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support INIT_PRODUCER_ID
It happens with the latest Kafka client libraries where enable.idempotence configuration defaults to true.
Work-around:
– add
Properties properties = new Properties();
…
properties.put(“enable.idempotence”, false);
LikeLike