Kafka logging with certificates from Vault PKI

Hi all!
I would like to know if it is possible to send messages to the producer in k6 using logging with vault certificates in .pem format. I need to connect to the producer and write messages to the topic, but authorization happens with certificates. K6 can do that? If yes, then I will be grateful if anyone has an example of a connection.
Thanks.

Hi @volfagnt,

It doesn’t currently support SSL certs. Please keep an eye on this issue or help contribute to it, which is much appreciated. :slight_smile:

The TLS support is already available, as documented in the JS API docs.

Hi! I saw that changes were added to work with certificates. Can you tell if it only accepts certificates as a file or can it work with them as a string?
A have some errors with ssl and certs in .pem format

This Errors when i try create topic:

time=“2022-06-18T16:10:31+03:00” level=error msg=“Unable to unmarshal credentials, OriginalError: %!w(*json.SyntaxError=&{invalid character ‘o’ looking for beginning of value 2})” error=“Unable to unmarshal credentials, OriginalError: %!w(*json.SyntaxError=&{invalid character ‘o’ looking for beginning of value 2})”
time=“2022-06-18T16:10:31+03:00” level=error msg=“Unable to unmarshal credentials, OriginalError: %!w(*json.SyntaxError=&{invalid character ‘o’ looking for beginning of value 2})” error=“Unable to unmarshal credentials, OriginalError: %!w(*json.SyntaxError=&{invalid character ‘o’ looking for beginning of value 2})”
time=“2022-06-18T16:10:31+03:00” level=error msg=“Failed to create dialer., OriginalError: %!w(*fmt.wrapError=&{failed to dial: failed to open connection to [kafka-test-1:9093,kafka-test-2:9093,kafka-test-3:9093]:9092: dial tcp: lookup kafka-test-1:9093,kafka-test-2:9093,kafka-test-3:9093: no such host 0xc0026803c0})” error=“Failed to create dialer., OriginalError: %!w(*fmt.wrapError=&{failed to dial: failed to open connection to [kafka-test-1:9093,kafka-test-2:9093,kafka-test-3:9093]:9092: dial tcp: lookup kafka-test-1:9093,kafka-test-2:9093,kafka-test-3:9093: no such host 0xc0026803c0})”
time=“2022-06-18T16:10:31+03:00” level=error msg=“Unable to unmarshal credentials, OriginalError: %!w(*json.SyntaxError=&{invalid character ‘o’ looking for beginning of value 2})” error=“Unable to unmarshal credentials, OriginalError: %!w(*json.SyntaxError=&{invalid character ‘o’ looking for beginning of value 2})”

And when I use writer:
time=“2022-06-18T16:10:31+03:00” level=error msg=“Unable to unmarshal credentials, OriginalError: %!w(*json.SyntaxError=&{invalid character ‘o’ looking for beginning of value 2})” error=“Unable to unmarshal credentials, OriginalError: %!w(*json.SyntaxError=&{invalid character ‘o’ looking for beginning of value 2})”
time=“2022-06-18T16:10:31+03:00” level=error msg=“Unable to unmarshal credentials, OriginalError: %!w(*json.SyntaxError=&{invalid character ‘o’ looking for beginning of value 2})” error=“Unable to unmarshal credentials, OriginalError: %!w(*json.SyntaxError=&{invalid character ‘o’ looking for beginning of value 2})”
time=“2022-06-18T16:10:31+03:00” level=error msg=“Failed to create dialer., OriginalError: %!w(*fmt.wrapError=&{failed to dial: failed to open connection to [kafka-test-1:9093,kafka-test-2:9093,kafka-test-3:9093]:9092: dial tcp: lookup kafka-test-1:9093,kafka-test-2:9093,kafka-test-3:9093: no such host 0xc0026803c0})” error=“Failed to create dialer., OriginalError: %!w(*fmt.wrapError=&{failed to dial: failed to open connection to [kafka-test-1:9093,kafka-test-2:9093,kafka-test-3:9093]:9092: dial tcp: lookup kafka-test-1:9093,kafka-test-2:9093,kafka-test-3:9093: no such host 0xc0026803c0})”
time=“2022-06-18T16:10:31+03:00” level=error msg=“Unable to unmarshal credentials, OriginalError: %!w(*json.SyntaxError=&{invalid character ‘o’ looking for beginning of value 2})” error="Unable to unmarshal credentials, OriginalError: %!w(*json.SyntaxError=&{invalid character ‘o’ looking for beginning of value 2})"time=“2022-06-18T16:10:31+03:00” level=error msg=“Unable to unmarshal credentials, OriginalError: %!w(*json.SyntaxError=&{invalid character ‘o’ looking for beginning of value 2})” error=“Unable to unmarshal credentials, OriginalError: %!w(*json.SyntaxError=&{invalid character ‘o’ looking for beginning of value 2})”
time=“2022-06-18T16:10:31+03:00” level=error msg=“Unable to unmarshal credentials, OriginalError: %!w(*json.SyntaxError=&{invalid character ‘o’ looking for beginning of value 2})” error=“Unable to unmarshal credentials, OriginalError: %!w(*json.SyntaxError=&{invalid character ‘o’ looking for beginning of value 2})”
time=“2022-06-18T16:10:31+03:00” level=error msg=“Failed to create dialer., OriginalError: %!w(*fmt.wrapError=&{failed to dial: failed to open connection to [kafka-test-1:9093,kafka-test-2:9093,kafka-test-3:9093]:9092: dial tcp: lookup kafka-test-1:9093,kafka-test-2:9093,kafka-test-3:9093: no such host 0xc0026803c0})” error=“Failed to create dialer., OriginalError: %!w(*fmt.wrapError=&{failed to dial: failed to open connection to [kafka-test-1:9093,kafka-test-2:9093,kafka-test-3:9093]:9092: dial tcp: lookup kafka-test-1:9093,kafka-test-2:9093,kafka-test-3:9093: no such host 0xc0026803c0})”
time=“2022-06-18T16:10:31+03:00” level=error msg=“Unable to unmarshal credentials, OriginalError: %!w(*json.SyntaxError=&{invalid character ‘o’ looking for beginning of value 2})” error=“Unable to unmarshal credentials, OriginalError: %!w(*json.SyntaxError=&{invalid character ‘o’ looking for beginning of value 2})”

My code example:
const saslConfig = {
algorithm: “sasl_ssl”,
};

// TLS config is optional
const tlsConfig = {
// Enable/disable TLS (default: false)
enableTLS: true,
// Skip TLS verification if the certificate is invalid or self-signed (default: false)
insecureSkipTLSVerify: false,
// Possible values: “TLSv1.0”, “TLSv1.1”, “TLSv1.2” (default), “TLSv1.3”
minVersion: “TLSv1.0”,
// Only needed if you have a custom or self-signed certificate and keys
clientCertPem: “./client.pem”,
clientKeyPem: “./client-key.pem”,
serverCaPem: “./ca.pem”,
};

const [producer, _writerError] = writer(bootstrapServers, kafkaTopic, saslConfig, tlsConfig);
function getRandomArbitrary(min, max) {
return Math.random() * (max - min) + min;
}
for (let index = 0; index < 1; index++) {
var nf_k6_key = new Date().toISOString().slice(0, 10);
var currentDate = new Date().toISOString().slice(0, 19).replace(‘T’, ’ ');
var itm = getRandomArbitrary(1111111,7777777);
let messages = [
{
key: JSON.stringify({
correlationId: nf_k6_key + index,
}),
value: JSON.stringify({
“ITEMNO”: itm,
“SITEID”: “BLA-BLA.COM”,
“BUSPRODUCTID”: “”,
“CRMPRODUCTID”: “”,
“BUSSECTIONID”: “”,
“CRMSECTIONID”: “”,
“BODY”: {
“NAME”: " A4Tech Fstyler FB10C (fb10c matcha green NT)",
“GROUPCODE”: “PS020406”,
“EAN”: “”,
“CODE”: “www.bla-bla.com/a4tech-fb10c-” + itm
}
}),
headers: {
createdatutc: currentDate,
createdby: “SRV_NAV”,
aggregatetype:“40”,
},
},
];

     let error = produce(producer, messages);
     check(error, {
         "Messages are sent": (err) => err == undefined,
     });
 }

}

@volfagnt It needs filenames. Which version of xk6-kafka are you using?

I made a few fixes very recently (below), which need to be built for testing because I haven’t released that code yet, due to the current release process. Also, the latest API changes are available here.

Hi. I find fix and build version and i have new error:
level=error msg=“x509: certificate relies on legacy Common Name field, use SANs instead”
What’s wrong? In my case common name is required field.
I take certificates from vault in pem format.

@volfagnt The error is probably a security issue, and you have two options, as stated here:

  1. Re-sign your certificate and add subjectAltName.
  2. Temporarily disable it when building xk6-kafka with xk6 by setting this environment variable GODEBUG="x509ignoreCN=0":
git clone git@github.com:mostafa/xk6-kafka.git && cd xk6-kafka
GODEBUG="x509ignoreCN=0" xk6 build --with github.com/mostafa/xk6-kafka@latest=.

Please let me know if it worked or not.

I’m using your add-on as a local folder. And when I try to build I get an error:
./xk6 build GODEBUG=“x509ignoreCN=0” --with github.com/mostafa/xk6-kafka=“E:/some_po/k6-v0.38.3-windows/k6kafka/k6mus/main/xk6-kafka”
2022/06/20 13:09:22 [INFO] Temporary folder: C:\Users\volfg\AppData\Local\Temp\buildenv_2022-06-20-1309.426519271
2022/06/20 13:09:22 [INFO] Initializing Go module
2022/06/20 13:09:22 [INFO] exec (timeout=10s): C:\Program Files\Go\bin\go.exe mod init k6
go: creating new go.mod: module k6
2022/06/20 13:09:23 [INFO] Replace GitHub - mostafa/xk6-kafka: k6 extension to load test Apache Kafka with support for various serialization formats, SASL, TLS, compression, Schema Registry client and beyond => E:/some_po/k6-v0.38.3-windows/k6kafka/k6mus/main/xk6-kafka
2022/06/20 13:09:23 [INFO] exec (timeout=0s): C:\Program Files\Go\bin\go.exe mod edit -replace github.com/mostafa/xk6-kafka=E:/some_po/k6-v0.38.3-windows/k6kafka/k6mus/main/xk6-kafka
2022/06/20 13:09:23 [INFO] exec (timeout=0s): C:\Program Files\Go\bin\go.exe mod tidy -compat=1.17
go: warning: “all” matched no packages
2022/06/20 13:09:23 [INFO] Pinning versions
2022/06/20 13:09:23 [INFO] exec (timeout=0s): C:\Program Files\Go\bin\go.exe mod tidy -compat=1.17
go: found GitHub - mostafa/xk6-kafka: k6 extension to load test Apache Kafka with support for various serialization formats, SASL, TLS, compression, Schema Registry client and beyond in GitHub - mostafa/xk6-kafka: k6 extension to load test Apache Kafka with support for various serialization formats, SASL, TLS, compression, Schema Registry client and beyond v0.0.0-00010101000000-000000000000
go: finding module for package GitHub - fsnotify/fsnotify: Cross-platform file system notifications for Go.
go: finding module for package gopkg.in/tomb.v1
go: found gopkg.in/tomb.v1 in gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7
go: found GitHub - fsnotify/fsnotify: Cross-platform file system notifications for Go. in GitHub - fsnotify/fsnotify: Cross-platform file system notifications for Go. v1.5.4
2022/06/20 13:09:24 [INFO] Writing main module: C:\Users\volfg\AppData\Local\Temp\buildenv_2022-06-20-1309.426519271\main.go
2022/06/20 13:09:24 [INFO] exec (timeout=0s): C:\Program Files\Go\bin\go.exe mod edit -require go.k6.io/k6@GODEBUG=x509ignoreCN=0
2022/06/20 13:09:24 [INFO] exec (timeout=0s): C:\Program Files\Go\bin\go.exe mod tidy -compat=1.17
go: errors parsing go.mod:
C:\Users\volfg\AppData\Local\Temp\buildenv_2022-06-20-1309.426519271\go.mod:31:2: require go.k6.io/k6: version “GODEBUG=x509ignoreCN=0” invalid: unknown revision GODEBUG=x509ignoreCN=0
2022/06/20 13:09:26 [INFO] Cleaning up temporary folder: C:\Users\volfg\AppData\Local\Temp\buildenv_2022-06-20-1309.426519271
2022/06/20 13:09:26 [FATAL] exit status 1
When I use your example I get:

  • GODEBUG=“x509ignoreCN=0” xk6 build --with github.com/mostafa/xk6-kafk
  •   + CategoryInfo          : ObjectNotFound: (GODEBUG=x509ignoreCN=0:String) [], CommandNotFoundException
      + FullyQualifiedErrorId : CommandNotFoundException

@volfagnt
You are making a mistake by passing the environment variable as a command. Look at the examples here. This is an excellent resource for learning about global variables, a.k.a. environment variables, in BASH, if you are running xk6 on a GNU/Linux machine.

Update:
I just noticed you are running xk6 in PowerShell, which means that you need to read this, that is, you either set the environment variable in the System Properties -> Advanced -> Environment Variables -> System variables (and restarting your shell) or you can pass them directly to pwsh.exe while invoking it like this:

pwsh -Command { $env:GODEBUG="x509ignoreCN=0"; .\xk6 build --with github.com/mostafa/xk6-kafka@latest=. }

Thx, so build new one k6 like that - $Env:GODEBUG =‘x509ignoreCN=0’; ./xk6 build v0.38.3 --with github.com/mostafa/xk6-kafka@v0.11.0
I have created a System variable
изображение
And I am passing the certificates as a string, but still get errors(((


My conf

Consumer/producer :
time=“2022-06-21T02:19:44+03:00” level=info msg=“Cannot process TLS config” error=“Client certificate file not found.”
ERRO[0001] panic: runtime error: invalid memory address or nil pointer dereference
goroutine 68 [running]:
runtime/debug.Stack()
runtime/debug/stack.go:24 +0x65
go.k6.io/k6/js/common.RunWithPanicCatching.func1()
go.k6.io/k6@v0.38.3/js/common/util.go:101 +0x191
panic({0xcc3bc0, 0x1b3fd60})
runtime/panic.go:838 +0x207
github.com/dop251/goja.AssertFunction.func1.1()
github.com/dop251/goja@v0.0.0-20220405120441-9037c2b61cbf/runtime.go:2300 +0x6e
panic({0xcc3bc0, 0x1b3fd60})
runtime/panic.go:838 +0x207
github.com/dop251/goja.(*vm).try.func1()
github.com/dop251/goja@v0.0.0-20220405120441-9037c2b61cbf/vm.go:537 +0x635
panic({0xcc3bc0, 0x1b3fd60})
runtime/panic.go:838 +0x207
github.com/segmentio/kafka-go.(*Reader).activateReadLag(0xc000d61d40?)
github.com/segmentio/kafka-go@v0.4.31/reader.go:1142 +0x18
github.com/segmentio/kafka-go.(*Reader).FetchMessage(0x0, {0x1176810, 0xc000d30940})
github.com/segmentio/kafka-go@v0.4.31/reader.go:814 +0x78
github.com/segmentio/kafka-go.(*Reader).ReadMessage(0x0, {0x1176810, 0xc000d30940})
github.com/segmentio/kafka-go@v0.4.31/reader.go:791 +0x9c
github.com/mostafa/xk6-kafka.(*Kafka).consumeInternal(0xc001c90000, _, _, {{{0x0, 0x0}, {0x0, 0x0}, {0x0, 0x0}, 0x0}, …}, …)
github.com/mostafa/xk6-kafka@v0.11.0/consumer.go:120 +0x452
github.com/mostafa/xk6-kafka.(*Kafka).Consume(0x7?, 0x0?, 0xc000ee06c0?, {0x0?, 0x4?}, {0x0?, 0x2?})
github.com/mostafa/xk6-kafka@v0.11.0/consumer.go:61 +0x65
reflect.Value.call({0xdc4120?, 0xc001c90000?, 0x23fd9cd0a28?}, {0xe35efe, 0x4}, {0xc000ff7740, 0x4, 0xc001317210?})
reflect/value.go:556 +0x845
reflect.Value.Call({0xdc4120?, 0xc001c90000?, 0x1955a90?}, {0xc000ff7740, 0x4, 0x4})
reflect/value.go:339 +0xbf
github.com/dop251/goja.(*Runtime).wrapReflectFunc.func1({{0x117f078, 0x1bb77a0}, {0xc000ee63c0, 0x2, 0x12}})
github.com/dop251/goja@v0.0.0-20220405120441-9037c2b61cbf/runtime.go:1886 +0x1c5
github.com/dop251/goja.(*vm)._nativeCall(0xc0018b2400, 0xc001228280, 0x2)
github.com/dop251/goja@v0.0.0-20220405120441-9037c2b61cbf/vm.go:2727 +0x217
github.com/dop251/goja.call.exec(0x7?, 0xc0018b2400)
github.com/dop251/goja@v0.0.0-20220405120441-9037c2b61cbf/vm.go:2699 +0x95b
github.com/dop251/goja.(*vm).run(0xc0018b2400)
github.com/dop251/goja@v0.0.0-20220405120441-9037c2b61cbf/vm.go:408 +0x8b
github.com/dop251/goja.(*baseJsFuncObject)._call(0xc000cc0000, {{0x117f078, 0x1bb77a0}, {0xc002c18de0, 0x1, 0x1}}, {0x0?, 0x0}, {0x117f078, 0x1bb77a0})
github.com/dop251/goja@v0.0.0-20220405120441-9037c2b61cbf/func.go:193 +0x3ff
github.com/dop251/goja.(*baseJsFuncObject).call(...)
github.com/dop251/goja@v0.0.0-20220405120441-9037c2b61cbf/func.go:203
github.com/dop251/goja.(*baseJsFuncObject).Call(0x8?, {{0x117f078, 0x1bb77a0}, {0xc002c18de0, 0x1, 0x1}})
github.com/dop251/goja@v0.0.0-20220405120441-9037c2b61cbf/func.go:156 +0xc7
github.com/dop251/goja.AssertFunction.func1.2()
github.com/dop251/goja@v0.0.0-20220405120441-9037c2b61cbf/runtime.go:2305 +0x74
github.com/dop251/goja.(*vm).try(0xc00057b8c8?, 0x2d805?)
github.com/dop251/goja@v0.0.0-20220405120441-9037c2b61cbf/vm.go:545 +0x176
github.com/dop251/goja.AssertFunction.func1({0x117f078?, 0x1bb77a0?}, {0xc002c18de0?, 0x1?, 0x1?})
github.com/dop251/goja@v0.0.0-20220405120441-9037c2b61cbf/runtime.go:2304 +0x10c
go.k6.io/k6/js.(*VU).runFn.func1.1()
go.k6.io/k6@v0.38.3/js/runner.go:800 +0x43
go.k6.io/k6/js/eventloop.(*EventLoop).Start(0xc00018a9b0, 0xc000ed3bf0)
go.k6.io/k6@v0.38.3/js/eventloop/eventloop.go:112 +0x177
go.k6.io/k6/js.(*VU).runFn.func1()
go.k6.io/k6@v0.38.3/js/runner.go:799 +0xee
go.k6.io/k6/js/common.RunWithPanicCatching({0x117ff20?, 0xc0002e5ab0?}, 0x23fff48984a?, 0xc00057bb90?)
go.k6.io/k6@v0.38.3/js/common/util.go:105 +0x87go.k6.io/k6/js.(*VU).runFn(0xc000cd6000, {0x1176810, 0xc000d30940}, 0xa0?, 0xc00151a570, 0xc002c18dd0, {0xc002c18de0, 0x1, 0x1})
go.k6.io/k6@v0.38.3/js/runner.go:798 +0x2a5
go.k6.io/k6/js.(*ActiveVU).RunOnce(0xc000ce3040)
go.k6.io/k6@v0.38.3/js/runner.go:750 +0x40f
go.k6.io/k6/lib/executor.getIterationRunner.func1({0x11768b8, 0xc000c58c90}, {0x116d300?, 0xc000ce3040?})
go.k6.io/k6@v0.38.3/lib/executor/helpers.go:145 +0x64
go.k6.io/k6/lib/executor.(*activeVUPool).AddVU.func1()
go.k6.io/k6@v0.38.3/lib/executor/ramping_arrival_rate.go:546 +0xd1
created by go.k6.io/k6/lib/executor.(*activeVUPool).AddVU
go.k6.io/k6@v0.38.3/lib/executor/ramping_arrival_rate.go:540 +0x12e

@volfagnt The certificates should be passed as filenames, not strings. Please have a look at this unit test:

Hi! All three certs can be .pem??

@volfagnt According to the following, they must be PEM-encoded:

@volfagnt Keep an eye on this issue and possibly upvote it, so I know if you want to have JKS instead of PEM because I know what a hassle it is to convert them.

I had such a problem on jmeter, if you add bouncycastle lib, then you can bypass jks on spring and java.|
Maybe there is a solution for Go like bouncycastle.

Now for my scenario, I added the ability to create a local file with certificates in .pem from response. But again began to receive an old problem.


My build was like that - $Env:GODEBUG =‘x509ignoreCN=0’; ./xk6 build v0.38.3 --with github.com/mostafa/xk6-kafka@v0.11.0 --with github.com/avitalique/xk6-file@latest
OS Windows Powershell.

@volfagnt According to the following link, the feature flag is removed in Go 1.17, so you can no longer pass the env-var to xk6 and Go while compiling xk6-kafka. You must re-sign your certificate and set subjectAltName.