Kafka Connect Snowflake Connector offset mismatch fix

How to fix offset mismatch and get rid of the error: 'Ignoring invalid task provided offset…' in Kafka Connect Snowflake connector.

Kafka to Snowflake pipeline

If you're here, your Kafka Connect cluster most likely stopped working and is not inserting any new messages into Snowflake. You checked the logs and saw this WARN message:

[2024-04-08 16:04:23,507] WARN WorkerSinkTask{id=my-sf-connector}
Ignoring invalid task provided offset my-example-topic-1/OffsetAndMetadata{offset=33500, leaderEpoch=null, metadata=''}
-- not yet consumed, taskOffset=33500 currentOffset=105 (org.apache.kafka.connect.runtime.WorkerSinkTask:426)

If you got this message, someone most likely recreated the topic, changed the Kafka cluster or you migrated to a new cluster without copying the __consumer_offsets topic. These are some of the possible causes for this issue I can think of.
Strangely enough, this is only a WARN log level even though it means that there is an offset mismatch and messages can't be consumed from that topic.
This can be dangerous since usually you don't have alerts set up for that log level and if you have a low retention period set on your topics, there's a high risk of losing your precious messages.
If you're like me, a self proclaimed Kafka Connect expert with no proof or certification, you probably thought that this is a bit inconvenient, but an easy fix.
I just need to reset the offset for the specific consumer group and recreate the connector. Right?? It's so simple!
So that's exactly what I did.

Trying to fix the issue on the Kafka side

  1. I first deleted the Snowflake connector by sending a DELETE request to the Kafka Connect connector endpoint
curl -X DELETE http://localhost:8083/connectors/MY_CONNECTOR_NAME
  1. Then I had to stop all consumers with that specific consumer group (consumer.group.id). Since you can't change offsets with an ACTIVE/STABLE group, the group must be INACTIVE. If you have a consumer for DLQ (dead letter queue), don't forget to turn that one off too.
    To check if there are active consumers, you can use kafka-consumer-groups.sh script that comes with every Kafka installation
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --command-config config/client-ssl.properties --group ExampleFooGroup --describe
  1. After all the consumers are stopped and the group is in an INACTIVE state, I used the same script to reset the offsets
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --command-config config/client-ssl.properties --reset-offsets --to-earliest --group ExampleFooGroup --topic MyExampleTopic --execute

This will only reset offsets for MyExampleTopic topic, if you want to reset offsets for all topic you can use --all-topics flag instead of --topic.
4. I checked the offsets again to make sure the reset happened and recreated the connector

After doing all the steps, the issue was still there and the connector was still looking at the old offsets.
Since I set up and deployed several Kafka Connect clusters and am familiar with a lot of the configuration properties, especially in distributed mode, I was almost certain that the offsets were stored in the topic that was configured via the property offset.storage.topic.
I inspected the topic and, to my surprise, the topic was empty. I double checked everything in case I misconfigured or missed something but everything was looking good. After some research, I found this Google Groups conversation where it says that the offset.storage.topic is used only for source connectors.
Sink connectors use the main __consumer_offsets topic to store offsets which also stores all the other offsets for all consumer groups in the cluster.

I was confused since I read the documentation for that property a few times and I couldn't recall it mentioning that the property was only for source connectors. I went to check the docs again and there was no mention that the topic is for source connectors only. Not a skill issue, just missing info in the docs; ego restored, let's continue.

One similar thing that is mentioned in the docs but can be easily overlooked and is not that obvious is that group.id for sink connectors is derived from the consumer.group.id.

Anyways, I recreated the topic just to be sure but the issue was still there. So the only idea I had left was that the offsets were stored somewhere in Snowflake.

Trying to fix the issue on the Snowflake side

I opened the Snowsight console and started inspecting the tables that were mapped to the topics which were causing the offset mismatch issues. If you are not familiar with how Snowflake connectors store the data, basically it creates two columns in the specified table, one for the metadata called RECORD_METADATA and one for the messages called RECORD_CONTENT. Since RECORD_CONTENT are just the deserialized messages, the only interesting column that was left was RECORD_METADATA.
I inspected the column and interestingly enough, the offset was stored in the metadata!
Okay, okay, we're onto something here... or at least I thought...
My idea was that if I clear/empty those columns, then the offsets will be gone and the connector will start from the beginning. So I did just that.

  1. I backed up the table in case something went wrong
create table MY_TABLE_BACKUP clone MY_TABLE;
  1. Cleared the RECORD_METADATA column aka set it to NULL
update MY_TABLE set RECORD_METADATA = NULL;
  1. Recreated the Snowflake connector
  2. Profit???

Having already spent more time than I planned on this, I was eagerly waiting for the connector to come up just to see that nothing has changed, the issue was still there. 😢
I didn't give up, I had one more idea to try!

Last try and final fix


UPDATE: Big thanks to Niv Atzmon for reaching out and letting me know that there is a better fix than the one I originally found. I won't delete the one I found but I will label it as "workaround". In case that this fix doesn't work for you, you can always try the workaround. Feel free to skip to the "Final fix" section and try that one first.


Workaround

Since I tried almost all the ideas I had, the only thing left I could think of was to recreate the Snowflake tables. I couldn't afford to lose the data so I had to also create backups for all the tables.

  1. I first backed up all tables that had the offset mismatch issue
create table MY_TABLE_BACKUP clone MY_TABLE;
create table MY_TABLE_2_BACKUP clone MY_TABLE_2;
  1. Dropped the old tables
drop table MY_TABLE;
drop table MY_TABLE_2;
  1. Cloned the backup tables into new tables that had same names as the ones that were having offset issues
create table MY_TABLE clone MY_TABLE_BACKUP;
create table MY_TABLE_2 clone MY_TABLE_2_BACKUP;
  1. Restarted the connector

Surprisingly enough, this actually worked! 🎉🎉

Final fix

What if I told you that there is a fix that doesn't require you dropping your tables and resetting the offset to 0, but rather it lets you set the offset to your desired value?
Sounds great, but it's not that easy to find.
If you dig deep enough, you will stumble upon system functions in Snowflake and there is one function that is especially interesting to us. The function is called SYSTEM$SNOWPIPE_STREAMING_UPDATE_CHANNEL_OFFSET_TOKEN. If you read the description of the function, it says "Updates the offset token for a particular channel used by Snowpipe Streaming with a new offset token." which means that the real location of the offset is in the channel. Now that I think about it, it actually makes sense...

So to finally fix the issue and reset/set the offset, the only thing left to do is to run the function with your table name, channel name and desired offset.

SYSTEM$SNOWPIPE_STREAMING_UPDATE_CHANNEL_OFFSET_TOKEN('<dbName>.<schemaName>.<tableName>', '<channelName>', '<new_offset_token>');

To reset the offset, set <new_offset_token> to -1.

In case that you're not sure what the name of your channel is, you can run SHOW CHANNELS

SHOW CHANNELS;

Keep in mind that to edit the channel, you will need to use a role that has write permissions on the targeted table.

Thanks again to Niv Atzmon and the Snowflake team for a more elegant solution.