K6 Kafka extension : How to use mentioned settings using kafka extension

Hi there,
I have attached one image from jmeter kafka setting. How to mention these settings of Kafka because in writer object i only see we have options of broker and topic only but where to mention all these settings and how ?

image

Hi @Shabd,

All the exported functions should be available importing the module object from k6/x/kafka. The exported objects, constants and other data structures are available in the index.d.ts. You can also find some examples under https://github.com/mostafa/xk6-kafka/tree/main/scripts.

Are you missing any of the configuration options you are seeing when using JMeter?

Cheers!

Hi @eyeveebe Thanks for replying .
Yeah i am confused regarding some settings . Like i see below settings are supported by producer config for kafka in extension.

export interface WriterConfig {
brokers: string[];
topic: string;
autoCreateTopic: boolean;
balancer: BALANCERS;
maxAttempts: number;
batchSize: number;
batchBytes: number;
batchTimeout: number;
readTimeout: number;
writeTimeout: number;
compression: COMPRESSION_CODECS;
sasl: SASLConfig;
tls: TLSConfig;
connectLogger: boolean;
}

But what if i have kafka producer configurations like linger.ms , acks , request.timeout.ms , retry.backoff.ms etc

Hey @Shabd,

Thanks for pointing this out. I added the missing requiredAcks parameter to WriterConfig in the JS API docs.

The xk6-kafka uses kafka-go under the hood, and exposes the parameters that package does. I understand that the naming of parameters are a bit different (not unconventional) from the official client library for Kafka in Java or JMeter, which means the config parameters of JMeter doesn’t necessarily map 1:1 with xk6-kafka.

1 Like

Thanks @mostafa for replying . I have one doubt . Below is the sample code which am trying to execute to post messages on kafka topic .

// produce messages on kafka topic

import { Writer, 
         SchemaRegistry , 
         SASL_PLAIN , 
         SCHEMA_TYPE_STRING,
         SCHEMA_TYPE_JSON,
         
        }  from "k6/x/kafka";
		
import { check } from "k6";

const saslConfig= {
  username: "username",
  password: "secret",
  algorithm: SASL_PLAIN
} 

const writ = new Writer({
  brokers: ["azure.confluent.c:9080"],
  topic: "Mtopic.internal.any.v1",
  sasl: saslConfig
});


const BasicAut={ 
            username: "username",
            password: "secret" 
               }


const schemaReg= new SchemaRegistry(
  {
    url: "https://0p.westeurope",
    basicAuth: BasicAut
  }
);






export default function() {
     
  let messages = [
    {
        
        value: schemaReg.serialize(
        {
        data: 
        {
          correlationId: "49df3bc6-0c57-43d8-94e9-7c02c61890f7 ",
          pro: {
            customerNo: "FZBA8VJ8X1",
            finalDischarge: "IN",
            eventTaskStatus: {
              arrangeCargoRelease: true ,
              approve: false,
              approveMonetary: true
          
            }}
          
     
           
        },
        
        schemaType: SCHEMA_TYPE_JSON,
      })
    } ]

    writ.produce({ messages: messages });


    
}

export function teardown(data) {
  
  writ.close();
 
}

On running it is throwing error :

time=“2022-12-20T12:37:29+05:30” level=error msg=“Error writing messages., OriginalError: %!w(*errors.errorString=&{unexpected EOF})” error=“Error writing messages., OriginalError: %!w(*errors.errorString=&{unexpected EOF})”
ERRO[0000] GoError: Error writing messages., OriginalError: %!w(*errors.errorString=&{unexpected EOF})
at github.com/mostafa/xk6-kafka.(*Kafka).writerClass.func1 (native)
at file:///C:/Users/SAD/OneDrive%20-%20Ma%20Group/Desktop/k6practice/ohm-performance/VV/kafka.js:82:29(51)
at native executor=per-vu-iterations scenario=default source=stacktrace

Not sure what am doing wrong . While writing messages its throwing end of file exception.

@Shabd
I think you have a configuration issue, probably with your SASL config, as the unexpected EOF implies that. Try using SASL_SSL with your credentials to see if it works.

The only other thing I found is settings KAFKA_ADVERTISED_LISTENERS correctly, so that Kafka inside the docker container doesn’t get confused. Your script excerpt shows that you are using a Confluent Kafka deployment on Azure, so this probably doesn’t apply. However, you can read more about it here:

2 Likes

Thanks @mostafa . It was config issue SASL_SSL worked . I was using the wrong security protocol .

2 Likes