How to Fix High Latency in Kafka Connect JDBC Source Connectors

High latency in a Kafka Connect JDBC source connector usually manifests as a widening gap between when a record is written to your database and when it appears in a Kafka topic. You might notice your streaming pipeline feels "laggy," or your downstream consumers are waiting minutes for data that should arrive in seconds. This bottleneck often stems from how the connector queries the source database, leading to inefficient resource usage and slow data throughput.

To resolve these performance issues, you must move away from expensive bulk polling toward efficient incremental tracking. By ensuring your database schema supports the connector’s query patterns with proper indexing and fine-tuning batch sizes, you can reduce ingestion latency from minutes to milliseconds. This guide focuses on identifying the root causes of JDBC slowness and implementing the specific configuration changes needed to restore high-speed data flow.

TL;DR — Switch from bulk to incrementing or timestamp+incrementing mode. Add a B-tree index to your offset column (ID or modified_at). Increase batch.max.rows to 5000+ and reduce poll.interval.ms to 100-500ms for near real-time performance.

Symptoms of JDBC Source Latency

💡 Analogy: Imagine a librarian who checks the entire library every hour to see if a single new book was added. As the library grows to millions of books, the librarian spends the whole hour just walking the aisles, never finishing the check before the next one starts. That is exactly what happens when a JDBC connector performs a full table scan on a large database.

The most common symptom of high latency is "Consumer Lag" reported in your Kafka monitoring tools, but specifically on the source side. Unlike sink connectors, source connector lag isn't always visible in standard Kafka offset metrics. Instead, you will see a significant time difference between the source_record_timestamp and the kafka_ingest_timestamp. If your data is 10 minutes old by the time it hits the topic, the connector is likely struggling to keep up with the database's write volume.

Another major symptom is high CPU or I/O wait on your source relational database (RDBMS). When the JDBC connector runs inefficient queries, the database engine has to work harder to return results. You might see the same SELECT query appearing repeatedly in your database's "slow query log" or "process list." In extreme cases, the connector might even cause table locks or deadlocks if the isolation levels are not configured correctly, further degrading the performance of your primary application.

Finally, check your Kafka Connect worker logs. If you see frequent warnings about Poll interval exceeded or if the connector seems to hang for long periods before emitting a batch of records, the connector is likely blocked by the database response time. This is a clear indicator that the query being sent to the DB is too heavy for the current schema to handle efficiently.

Root Causes of Slow Data Ingestion

1. Bulk Polling Instead of Incremental Tracking

The JDBC Source Connector supports several mode settings. The bulk mode is the most common cause of high latency in growing datasets. In bulk mode, the connector executes a SELECT * FROM table query every time it polls. If your table has 10 million rows, the connector tries to pull all 10 million rows into memory and send them to Kafka every few seconds. This is inherently unscalable and will eventually crash the connector or the database.

2. Missing Indexes on Offset Columns

Even if you use incrementing or timestamp modes, the connector still executes a SQL query with a WHERE clause, such as SELECT * FROM table WHERE id > ? ORDER BY id ASC. If the column id (your incrementing column) or updated_at (your timestamp column) is not indexed, the database must perform a "Full Table Scan." As the table grows, the time it takes to find the "next" rows increases linearly, leading to higher and higher latency over time.

3. Improper Batching Configuration

The batch.max.rows setting controls how many records the connector pulls from the database in a single request. If this number is too small (e.g., the default 100), the connector spends more time performing network handshakes and query overhead than actually moving data. Conversely, if it is too large without enough heap memory, the Kafka Connect worker may experience long Garbage Collection (GC) pauses, which halts all data processing.

4. Database Contention and Locking

If your source database is under heavy load from your main application, the JDBC connector's queries might be queued behind other transactions. Furthermore, if the connector is using a strict transaction isolation level, it might be waiting for locks to be released on the rows it is trying to read. This is particularly common in PostgreSQL and SQL Server environments where SELECT queries can sometimes be blocked by long-running UPDATE or DELETE operations.

Steps to Fix High Latency

Step 1: Switch to Incremental Polling Modes

Update your connector configuration to use incrementing or timestamp+incrementing. This ensures the connector only asks for data it hasn't seen yet. Avoid bulk mode for any table larger than a few thousand rows. The timestamp+incrementing mode is the most robust, as it handles both new records and updates while using the ID to resolve collisions if multiple records share the same timestamp.


{
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "mode": "timestamp+incrementing",
  "incrementing.column.name": "id",
  "timestamp.column.name": "updated_at",
  "validate.non.null": "true"
}

Step 2: Apply Database Indexing

Verify that the columns specified in incrementing.column.name or timestamp.column.name have an index. In PostgreSQL or MySQL, a standard B-tree index is usually sufficient. Without this, your database performs an O(N) search for every poll, which is the primary driver of JDBC latency.


-- Example for a PostgreSQL or MySQL table
CREATE INDEX idx_table_updated_at ON your_table_name (updated_at);
CREATE INDEX idx_table_id ON your_table_name (id);

⚠️ Common Mistake: Do not use a column with low cardinality (like a "status" column) as your incrementing or timestamp column. The connector needs a strictly increasing value to track its position accurately. If you use a timestamp, ensure the database server and the Kafka Connect worker are using the same timezone (preferably UTC).

Step 3: Tune Batch and Poll Settings

To maximize throughput, you need to find the balance between how often you poll and how much data you fetch. Increase batch.max.rows to handle larger bursts of data and decrease poll.interval.ms to check for new data more frequently. For a high-traffic table, 5,000 rows per batch is a common starting point.


{
  "batch.max.rows": "5000",
  "poll.interval.ms": "500",
  "fetch.size": "5000"
}

Note: fetch.size is a hint to the JDBC driver about how many rows to fetch from the network in one trip. Setting this equal to batch.max.rows often reduces the number of round-trips between the Connect worker and the DB.

Verifying the Fix

After applying the configuration changes and adding indexes, you should verify the improvements using the following commands and metrics. First, check the current status of your connector to ensure it hasn't failed due to a configuration error.


# Check connector status via REST API
curl -s http://localhost:8083/connectors/your-jdbc-source/status | jq

Look for the source-record-poll-rate and source-record-poll-batch-avg-time-ms metrics via JMX. In a healthy, low-latency setup, the poll time should be under 100ms for most queries. If the average time remains high (e.g., >1000ms), the database query is still the bottleneck.

Next, use a tool like kafka-console-consumer with the --property print.timestamp=true flag to observe the real-time arrival of messages. Compare the Kafka timestamp with the timestamp stored in the record itself. In a tuned system, the difference should be stable and close to your poll.interval.ms.


# Watch records arriving in Kafka
kafka-console-consumer --bootstrap-server localhost:9092 \
  --topic your-topic-name \
  --from-beginning \
  --property print.timestamp=true

Preventing Future Latency Spikes

To prevent latency from creeping back as your data grows, you should implement automated monitoring and better schema practices. One effective strategy is to use a read-replica for your Kafka Connect source. By pointing your JDBC connector to a replica instead of the primary master database, you eliminate the risk of the connector impacting your application performance, and you can often optimize the replica's indexing specifically for Kafka Connect's query patterns.

📌 Key Takeaways:

  • Never use bulk mode for production tables.
  • Indexes on the polling columns are non-negotiable for low latency.
  • Match fetch.size with batch.max.rows to optimize JDBC driver performance.
  • Use a read-replica to isolate ingestion load from primary application traffic.

Finally, consider moving to **Change Data Capture (CDC)** like Debezium if your latency requirements are sub-second or if you need to capture deletes. The JDBC Source Connector is a "polling" mechanism, which inherently has some delay. CDC, on the other hand, reads the database's redo/transaction logs directly, pushing changes to Kafka the moment they happen without executing expensive SELECT queries.

Frequently Asked Questions

Q. Why is Kafka Connect JDBC source slow even with small tables?

A. It is often due to the poll.interval.ms being set too high or a low batch.max.rows default. Additionally, network latency between the Kafka Connect cluster and the database can cause overhead if the connector has to perform many small fetches instead of one large batch.

Q. How to optimize JDBC Source Connector performance for large tables?

A. Use timestamp+incrementing mode with an index on both columns. Increase batch.max.rows to 5,000 or 10,000 to improve throughput per poll. If you are using a heavy query, consider creating a database View that pre-filters or pre-joins data to simplify the connector's job.

Q. What is the difference between bulk and incrementing mode in Kafka Connect?

A. bulk mode fetches the entire table on every poll, which is very slow and resource-heavy. incrementing mode uses a column (like an ID) to only fetch rows with a value higher than the last seen ID. This makes incrementing mode significantly faster and more scalable for production use.

Post a Comment