Skip to main content
Skip to main content

AvroConfluent

InputOutputAlias

Description

AvroConfluent supports decoding single-object Avro messages commonly used with Kafka and Confluent Schema Registry. Each Avro message embeds a schema ID that can be resolved to the actual schema with the help of the Schema Registry. Schemas are cached once resolved.

Data Types Matching

The table below shows all data types supported by the Apache Avro format, and their corresponding ClickHouse data types in INSERT and SELECT queries.

Avro data type INSERTClickHouse data typeAvro data type SELECT
boolean, int, long, float, doubleInt(8\16\32), UInt(8\16\32)int
boolean, int, long, float, doubleInt64, UInt64long
boolean, int, long, float, doubleFloat32float
boolean, int, long, float, doubleFloat64double
bytes, string, fixed, enumStringbytes or string *
bytes, string, fixedFixedString(N)fixed(N)
enumEnum(8\16)enum
array(T)Array(T)array(T)
map(V, K)Map(V, K)map(string, K)
union(null, T), union(T, null)Nullable(T)union(null, T)
union(T1, T2, …) **Variant(T1, T2, …)union(T1, T2, …) **
nullNullable(Nothing)null
int (date) ***Date, Date32int (date) ***
long (timestamp-millis) ***DateTime64(3)long (timestamp-millis) ***
long (timestamp-micros) ***DateTime64(6)long (timestamp-micros) ***
bytes (decimal) ***DateTime64(N)bytes (decimal) ***
intIPv4int
fixed(16)IPv6fixed(16)
bytes (decimal) ***Decimal(P, S)bytes (decimal) ***
string (uuid) ***UUIDstring (uuid) ***
fixed(16)Int128/UInt128fixed(16)
fixed(32)Int256/UInt256fixed(32)
recordTuplerecord

* bytes is default, controlled by setting output_format_avro_string_column_pattern

** The Variant type implicitly accepts null as a field value, so for example the Avro union(T1, T2, null) will be converted to Variant(T1, T2). As a result, when producing Avro from ClickHouse, we have to always include the null type to the Avro union type set as we don't know if any value is actually null during the schema inference.

*** Avro logical types

Unsupported Avro logical data types:

  • time-millis
  • time-micros
  • duration

Example Usage

To quickly verify schema resolution, you can use kafkacat with clickhouse-local:

$ kafkacat -b kafka-broker  -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local   --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String"  -q 'select *  from table'
1 a
2 b
3 c

To use AvroConfluent with Kafka:

CREATE TABLE topic1_stream
(
field1 String,
field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent';

-- for debug purposes you can set format_avro_schema_registry_url in a session.
-- this way cannot be used in production
SET format_avro_schema_registry_url = 'http://schema-registry';

SELECT * FROM topic1_stream;

Format Settings

The Schema Registry URL is configured with format_avro_schema_registry_url.

Note

Setting format_avro_schema_registry_url needs to be configured in users.xml to maintain it’s value after a restart. Also you can use the format_avro_schema_registry_url setting of the Kafka table engine.

SettingDescriptionDefault
input_format_avro_allow_missing_fieldsFor Avro/AvroConfluent format: when field is not found in schema use default value instead of error0
input_format_avro_null_as_defaultFor Avro/AvroConfluent format: insert default in case of null and non Nullable column0
format_avro_schema_registry_urlFor AvroConfluent format: Confluent Schema Registry URL.