Troubleshooting OCI streaming Kafka Java Client


PRODUCERS

Let’s use the following producer example:

package javiermugueta.blog;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class Producer {

    static String bootstrapServers = "c....oraclecloud.com:9092";
    static String tenancyName = "x......k";
    static String username = "oracleidentitycloudservice/F....s";
    static String streamPoolId = "ocid1.streampool.oc1.eu-frankf...q";
    static String authToken = "6...V"; 
    static String streamOrKafkaTopicName = "a....l";
    static String proto = "SASL_SSL";

    private static Properties getKafkaProperties() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServers);
        properties.put("security.protocol", proto);
        properties.put("sasl.mechanism", "PLAIN");
        //properties.put("ssl.endpoint.identification.algorithm", "");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        final String value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
                + tenancyName + "/"
                + username + "/"
                + streamPoolId + "\" "
                + "password=\""
                + authToken + "\";";
    
        properties.put("sasl.jaas.config", value);
        properties.put("retries", 3); // retries on transient errors and load balancing disconnection
        properties.put("max.request.size", 1024 * 1024); // limit request size to 1MB
        return properties;
    }

    public static void main(String args[]) {
        try {
            Properties properties = getKafkaProperties();
            KafkaProducer producer = new KafkaProducer<>(properties);

            for(int i=0;i<100;i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>(streamOrKafkaTopicName, "messageKey" + i, "{ a=" +i +", b=" + 2*i + ", c=" + 3*i +" }" );
                producer.send(record, (md, ex) -> {
                    if (ex != null) {
                        System.err.println("exception occurred in producer for review :" + record.value()
                                + ", exception is " + ex);
                        ex.printStackTrace();
                    } else {
                        System.err.println("Sent msg to " + md.partition() + " with offset " + md.offset() + " at " + md.timestamp());
                    }
                });
            }
            // producer.send() is async, to make sure all messages are sent we use producer.flush()
            producer.flush();
            producer.close();
        } catch (Exception e) {
            System.err.println("Error: exception " + e);
        }
    }
}

1 – org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed

tenancyName adn/or username and/or streamPoolId and/or authToken are not correct

2 – org.apache.kafka.common.errors.TimeoutException: Topic XXXXXX not present in metadata after MMMM ms

bootstrapServers and/or streamOrKafkaTopicName are not correct

When “security.protocol” properrty is not “SASL_SSL

When “sasl.mechanism” property is not “PLAIN

3 – org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: No subject alternative names matching IP address xxx.yyy.zzz.ttt found

Use appropriate DNS configuratiion or set:

properties.put("ssl.endpoint.identification.algorithm", "");

4 – exception occurred in producer …, exception is org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition

Incorrect or insufficient privileges (policies):

allow any-user to use stream-family in compartment xploraDEV where any {request.user.id='....', request.user.id='...' }
---
allow any-user to {STREAM_READ, STREAM_CONSUME} in compartment xploraDEV where any {request.user.id='...', request.user.id='...' }

(*) or {request.groups.id='...'}

NOTE: Policies can also be restricted with policy sintax like this and this

CONSUMERS

Let’s use the following consumer example:

package javiermugueta.blog;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;


public class Consumer {
    static String bootstrapServers = "138.1.1.194:9092"; //"cell-1.streaming.eu-frankfurt-1.oci.oraclecloud.com:9092";
    static String tenancyName = "x....k";
    static String username = "oracleidentitycloudservice/F...s";
    static String streamPoolId = "ocid1.streampool.oc1.e...a";
    static String authToken = "6...V"; 
    static String streamOrKafkaTopicName = "test";
    static String consumerGroupName = "G00001"; 

    static String proto =  "SASL_SSL"; 

    private static Properties getKafkaProperties(){
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("group.id", consumerGroupName);
        props.put("security.protocol", proto);
        props.put("sasl.mechanism", "PLAIN");
        props.put("ssl.endpoint.identification.algorithm", "");
        props.put("enable.auto.commit", "false");
        props.put("session.timeout.ms", "30000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest");
        final String value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
                + tenancyName + "/"
                + username + "/"
                + streamPoolId + "\" "
                + "password=\""
                + authToken + "\";";
        props.put("sasl.jaas.config", value);
        return props;
    }

    public static void main(String[] args) {
        final KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(getKafkaProperties());;
        consumer.subscribe(Collections.singletonList(streamOrKafkaTopicName));
        ConsumerRecords<Integer, String> records = consumer.poll(10000);

        System.out.println("size of records polled is "+ records.count());
        for (ConsumerRecord<Integer, String> record : records) {
            System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
        }

        consumer.commitSync();
        consumer.close();
    }
}

1 – ERRORS 1, 2, 3 AND 4 EXPLAINED IN THE PREVIOUS SECTION

2 – NO ERROR RAISED AT ALL BUT NO MESSAGE RETRIEVED AND YOU ARE 100% CONFIDENT THAT MESSAGES WHERE PUBLISHED TO THE TOPIC (size of records polled is 0)

If the topicname string you are using doesn’t exist, the client is not raising any error, therefore double check you are using the correct topicname.

That’s all, hope iit helps!!

3 Comments

  1. Marc

    I have an error:
    org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support INIT_PRODUCER_ID

    It happens with the latest Kafka client libraries where enable.idempotence configuration defaults to true.

    Work-around:
    – add

    Properties properties = new Properties();

    properties.put(“enable.idempotence”, false);

    Like

Leave a reply to Marc Cancel reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.