AvroConfluent
Input | Output | Alias |
---|---|---|
✔ | ✗ |
Description
AvroConfluent supports decoding single-object Avro messages commonly used with Kafka and Confluent Schema Registry. Each Avro message embeds a schema ID that can be resolved to the actual schema with the help of the Schema Registry. Schemas are cached once resolved.
Data Types Matching
The table below shows all data types supported by the Apache Avro format, and their corresponding ClickHouse data types in INSERT
and SELECT
queries.
Avro data type INSERT | ClickHouse data type | Avro data type SELECT |
---|---|---|
boolean , int , long , float , double | Int(8\16\32), UInt(8\16\32) | int |
boolean , int , long , float , double | Int64, UInt64 | long |
boolean , int , long , float , double | Float32 | float |
boolean , int , long , float , double | Float64 | double |
bytes , string , fixed , enum | String | bytes or string * |
bytes , string , fixed | FixedString(N) | fixed(N) |
enum | Enum(8\16) | enum |
array(T) | Array(T) | array(T) |
map(V, K) | Map(V, K) | map(string, K) |
union(null, T) , union(T, null) | Nullable(T) | union(null, T) |
union(T1, T2, …) ** | Variant(T1, T2, …) | union(T1, T2, …) ** |
null | Nullable(Nothing) | null |
int (date) *** | Date, Date32 | int (date) *** |
long (timestamp-millis) *** | DateTime64(3) | long (timestamp-millis) *** |
long (timestamp-micros) *** | DateTime64(6) | long (timestamp-micros) *** |
bytes (decimal) *** | DateTime64(N) | bytes (decimal) *** |
int | IPv4 | int |
fixed(16) | IPv6 | fixed(16) |
bytes (decimal) *** | Decimal(P, S) | bytes (decimal) *** |
string (uuid) *** | UUID | string (uuid) *** |
fixed(16) | Int128/UInt128 | fixed(16) |
fixed(32) | Int256/UInt256 | fixed(32) |
record | Tuple | record |
* bytes
is default, controlled by setting output_format_avro_string_column_pattern
** The Variant type implicitly accepts null
as a field value, so for example the Avro union(T1, T2, null)
will be converted to Variant(T1, T2)
.
As a result, when producing Avro from ClickHouse, we have to always include the null
type to the Avro union
type set as we don't know if any value is actually null
during the schema inference.
Unsupported Avro logical data types:
time-millis
time-micros
duration
Example Usage
To quickly verify schema resolution, you can use kafkacat with clickhouse-local:
$ kafkacat -b kafka-broker -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String" -q 'select * from table'
1 a
2 b
3 c
To use AvroConfluent
with Kafka:
CREATE TABLE topic1_stream
(
field1 String,
field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent';
-- for debug purposes you can set format_avro_schema_registry_url in a session.
-- this way cannot be used in production
SET format_avro_schema_registry_url = 'http://schema-registry';
SELECT * FROM topic1_stream;
Format Settings
The Schema Registry URL is configured with format_avro_schema_registry_url
.
Setting format_avro_schema_registry_url
needs to be configured in users.xml
to maintain it’s value after a restart. Also you can use the format_avro_schema_registry_url
setting of the Kafka
table engine.
Setting | Description | Default |
---|---|---|
input_format_avro_allow_missing_fields | For Avro/AvroConfluent format: when field is not found in schema use default value instead of error | 0 |
input_format_avro_null_as_default | For Avro/AvroConfluent format: insert default in case of null and non Nullable column | 0 |
format_avro_schema_registry_url | For AvroConfluent format: Confluent Schema Registry URL. |