Error while posting message on kafka topic using xk6 extension

//import { check } from "k6";
import {
   Writer,
  SchemaRegistry,
  SCHEMA_TYPE_AVRO,
  SCHEMA_TYPE_JSON,
  SASL_SSL,
  TLS_1_2,
  TLS_1_1,
  TLS_1_3,
  TLS_1_0
  //SCHEMA_TYPE_JSON,
  
} from "k6/x/kafka"; // import kafka extension

const saslConfig= {
  username: "v1",
  password: "g",
  algorithm: SASL_SSL,
} 
const t= {
  enableTls:true
  }
const writer = new Writer({
    brokers: ["digital.net:443"],
    topic: "any.v1",
    sasl: saslConfig,
    tls: t
    });
    
 

const tlsConfig={
    enableTls: true,
    insecureSkipTlsVerify:true,
    clientCertPem:"./truststore_rootca.pem",
    clientKeyPem:"./keystore_retina.pem",
    serverCaPem:"./serverca1.pem",
	
	} 
 

     

const schemaReg = new SchemaRegistry
(
 {
  url: "https://sk-digital.net",
  tls: tlsConfig
 }
);



export default function () { 
   {
    let messages = [
      {
        value: schemaReg.serialize({
          data: {
            Name:"SHABDA_PRODUCT",
            domain:"sHABD_DOMAIN",
            ordernumber:"M123",       
          },
         
          schemaType: SCHEMA_TYPE_JSON,
        }),
      },
    ];
    writer.produce({ messages: messages });
  }
}

Hi team , when am trying to execute the above code , am getting below error

GoError: Error creating x509 key pair from "./truststore_rootca.pem" and "./keystore_retina.pem"., OriginalError: %!w(*errors.errorString=&{tls: found a certificate rather than a key in the PEM for the private key})

Can you please help.

We had truststore.jks and keystore.jks files which we converted to .pem files and placed them in the code above
and we have a ca.p12 server ca file which we again converted to pem file for server ca pem field .

@mostafa Can you please help here

Hey @Shabd,

You are feeding the wrong files to the config object, as stated in the error:

tls: found a certificate rather than a key in the PEM for the private key

And handling JKS files are tricky. I recommend reading this SO Q&A. And please upvote this issue, so I can see if enough users are interested.

1 Like

@mostafa After some trial i was able to connect to kafka cluster and post msg on topic .But some errors occured and i have some queries which am listing below .
Here is my sample code which i used :

//import { check } from "k6";
import {
  Writer,
  SchemaRegistry,
  SCHEMA_TYPE_AVRO,
  SCHEMA_TYPE_JSON,
  SASL_SSL,
  SASL_SCRAM_SHA512,
  TLS_1_2,
  TLS_1_1,
  TLS_1_3,
  TLS_1_0
  //SCHEMA_TYPE_JSON,
  
} from "k6/x/kafka"; // import kafka extension

const saslConfig= {
  username: "v1",
  password: "07g",
  algorithm: SASL_SCRAM_SHA512,
} 
const t= {
  enableTls: true,
  serverCaPem:"./ca_crt1.pem",
  } 

    
 

const tlsConfig={
    clientCertPem:"./client_cert.pem",
    clientKeyPem:"./client_key.pem",
  
	
	} 

  const writer = new Writer({
    brokers: ["pp-east:443"],
    topic: "ms.kotsssss",
    sasl: saslConfig,
    tls: t
    });  
 

     

const schemaReg = new SchemaRegistry
(
 {
  url: "https://pp-hello.net",
  tls: tlsConfig
 }
);



export default function () { 
   {
    let messages = [
      {
        value: schemaReg.serialize({
          data: {
            product:"SHAB",
            domai:"sHA",
            ordernumber:"M1",
          },
         
          schemaType: SCHEMA_TYPE_JSON,
        }),
      },
    ];
    writer.produce({ messages: messages });
  }
}


Schema is AVRO format [ to be used ]

When i used schemaType as JSON as mentioned above code …i was able to connect to kafka cluster and produce msg on topic . However msg was not consumed because it threw below error :

caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

When i changed to Avro schema type which is the acceptable one , it throws below error :

ERRO[0000] panic: runtime error: invalid memory address or nil pointer dereference
goroutine 80 [running]:
runtime/debug.Stack()
        runtime/debug/stack.go:24 +0x65
go.k6.io/k6/js/common.RunWithPanicCatching.func1()   
        go.k6.io/k6@v0.43.1/js/common/util.go:82 +0x191
panic({0x17e5880, 0x2bc0050})
        runtime/panic.go:884 +0x212
github.com/dop251/goja.(*Runtime).runW

WARN[0000] Error from API server
    error="listen tcp 127.0.0.1:6565: bind: Only one 
usage of each socket address (protocol/network address/port) is normally permitted."

Not sure whats causing this

@Shabd

Have a look at this script if you want to work with Avro while using a Schema Registry and this one if you don’t use a Schema Registry.

When changing schemaType to one with an actual schema, like Avro, you have to pass the schema to the serialize and deserialize functions, otherwise it won’t work.

1 Like

Hi @mostafa Thanks for clarification. I have another query
I have avro schema in below format

{
  "type": "r",
  "namespace": "co",
  "name": "Serv",
  "fields": [
    {
      "name": "servi",
    },
    {
      "name": "pr",
      "type": [
        {
          "type": "record",
          "namespace": "co",
          "name": "Pro",
          "fields": [
            {
              "name": "pr",
              "doc": "An",
              "type": [ ],
              "default": null
            },
          
           

While in the xk6 kafka i see it supports in below format


	avroSchema := `{"type":"record","name":"Schema","namespace":"com.example.person","fields":[{"name":"field","type":"string"}]}

In my case i have nested field and array sections. Above is just a small part of the big schema . Can we use this in xk6 kafka ?

@Shabd

This is just an example. Your schema can be anything you choose, not necessarily the example I provided.