kafka_fdw

kafka Foreign Data Wrapper for CSV formatted messages

Overview

PackageVersionCategoryLicenseLanguage
kafka_fdw0.0.3FDWPostgreSQLC
IDExtensionBinLibLoadCreateTrustRelocSchema
8730kafka_fdwNoYesNoYesNoYes-
Relatedpgmq mongo_fdw redis_fdw wrappers multicorn redis hdfs_fdw wal2json

Version

TypeRepoVersionPG VerPackageDeps
EXTPIGSTY0.0.31817161514kafka_fdw-
RPMPIGSTY0.0.31817161514kafka_fdw_$v-
DEBPIGSTY0.0.31817161514postgresql-$v-kafka-fdw-
OS / PGPG18PG17PG16PG15PG14
el8.x86_64
el8.aarch64
el9.x86_64
el9.aarch64
el10.x86_64
el10.aarch64
d12.x86_64
d12.aarch64
d13.x86_64
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
d13.aarch64
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
u22.x86_64
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
u22.aarch64
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
u24.x86_64
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
u24.aarch64
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3

Build

You can build the RPM / DEB packages for kafka_fdw using pig build:

pig build pkg kafka_fdw         # build RPM / DEB packages

Install

You can install kafka_fdw directly. First, make sure the PGDG and PIGSTY repositories are added and enabled:

pig repo add pgsql -u          # Add repo and update cache

Install the extension using pig or apt/yum/dnf:

pig install kafka_fdw;          # Install for current active PG version
pig ext install -y kafka_fdw -v 18  # PG 18
pig ext install -y kafka_fdw -v 17  # PG 17
pig ext install -y kafka_fdw -v 16  # PG 16
pig ext install -y kafka_fdw -v 15  # PG 15
pig ext install -y kafka_fdw -v 14  # PG 14
dnf install -y kafka_fdw_18       # PG 18
dnf install -y kafka_fdw_17       # PG 17
dnf install -y kafka_fdw_16       # PG 16
dnf install -y kafka_fdw_15       # PG 15
dnf install -y kafka_fdw_14       # PG 14
apt install -y postgresql-18-kafka-fdw   # PG 18
apt install -y postgresql-17-kafka-fdw   # PG 17
apt install -y postgresql-16-kafka-fdw   # PG 16
apt install -y postgresql-15-kafka-fdw   # PG 15
apt install -y postgresql-14-kafka-fdw   # PG 14

Create Extension:

CREATE EXTENSION kafka_fdw;

Usage

Syntax:

CREATE EXTENSION kafka_fdw;
CREATE SERVER kafka_server FOREIGN DATA WRAPPER kafka_fdw
  OPTIONS (brokers 'localhost:9092');

Source: README

kafka_fdw is a foreign data wrapper that exposes Kafka messages as PostgreSQL foreign tables. The upstream README explicitly warns that the project is not yet production ready.

Server and Mapping

Define a foreign server with the Kafka broker list, then add a user mapping:

CREATE EXTENSION kafka_fdw;

CREATE SERVER kafka_server
FOREIGN DATA WRAPPER kafka_fdw
OPTIONS (brokers 'localhost:9092');

CREATE USER MAPPING FOR PUBLIC SERVER kafka_server;

Foreign Tables

Kafka foreign tables must declare two metadata columns, one marked with partition 'true' and one marked with offset 'true'. The remaining columns describe the message payload.

CSV Messages

CREATE FOREIGN TABLE kafka_test (
    part int OPTIONS (partition 'true'),
    offs bigint OPTIONS (offset 'true'),
    some_int int,
    some_text text,
    some_date date,
    some_time timestamp
)
SERVER kafka_server
OPTIONS (
    format 'csv',
    topic 'contrib_regress',
    batch_size '30',
    buffer_delay '100'
);

For CSV, columns are mapped by position. Upstream notes that schema enforcement depends on the message writer, so strict parsing and junk-handling options matter when input quality is uncertain.

JSON Messages

CREATE FOREIGN TABLE kafka_test_json (
    part int OPTIONS (partition 'true'),
    offs bigint OPTIONS (offset 'true'),
    some_int int OPTIONS (json 'int_val'),
    some_text text OPTIONS (json 'text_val'),
    some_date date OPTIONS (json 'date_val'),
    some_time timestamp OPTIONS (json 'time_val')
)
SERVER kafka_server
OPTIONS (
    format 'json',
    topic 'contrib_regress_json',
    batch_size '30',
    buffer_delay '100'
);

For JSON, each column can map to an object key with the json option. The current implementation supports JSON objects, not top-level JSON arrays.

Querying and Producing

The offset and partition columns are special, and the upstream README recommends specifying them in queries whenever possible:

SELECT * FROM kafka_test WHERE part = 0 AND offs > 1000 LIMIT 60;

SELECT *
FROM kafka_test
WHERE (part = 0 AND offs > 100)
   OR (part = 1 AND offs > 300)
   OR (part = 3 AND offs > 700);

Messages can also be produced with INSERT statements. If a partition value is supplied, it is used; otherwise Kafka’s builtin partitioner chooses one:

INSERT INTO kafka_test(part, some_int, some_text)
VALUES
    (0, 5464565, 'some text goes into partition 0'),
    (NULL, 5464565, 'some text goes into partition selected by kafka');

Error Handling

The default behavior is permissive:

  • missing trailing columns are treated as NULL
  • extra fields are ignored
  • unparsable values still raise errors by default

Relevant table options and helper columns include:

  • strict 'true' to reject column count mismatches
  • ignore_junk 'true' to set malformed values to NULL
  • columns marked junk 'true' to capture the original payload
  • columns marked junk_error 'true' to capture parsing errors

Build Notes

The extension uses librdkafka and the upstream build instructions are the standard:

make && make install
make installcheck

The test setup assumes Kafka on localhost:9092 and ZooKeeper on localhost:2181.


Last Modified 2026-04-10: extension update (322e1b4)