Перейти к основному содержимому
Перейти к основному содержимому

AvroConfluent

ВводВыводПсевдоним

Описание

Apache Avro — это строчно-ориентированный формат сериализации, который использует двоичное кодирование для эффективной обработки данных. Формат AvroConfluent поддерживает чтение и запись сообщений, закодированных в Avro, с использованием Confluent Schema Registry (или API-совместимых сервисов).

В каждом сообщении используется формат передачи данных Confluent: магический байт (0x00), за которым следует 4-байтовый идентификатор схемы в формате big-endian, а затем двоичные данные Avro. При чтении ClickHouse определяет идентификатор схемы, обращаясь к реестру. При записи ClickHouse регистрирует схему, полученную на основе выходных столбцов, и добавляет полученный идентификатор в начало каждой строки. Схемы кэшируются для оптимальной производительности.

Сопоставление типов данных

В таблице ниже приведены все типы данных, поддерживаемые форматом Apache Avro, и их соответствующие типы данных ClickHouse в запросах INSERT и SELECT.

Тип данных Avro INSERTТип данных ClickHouseТип данных Avro SELECT
boolean, int, long, float, doubleInt(8\16\32), UInt(8\16\32)int
boolean, int, long, float, doubleInt64, UInt64long
boolean, int, long, float, doubleFloat32float
boolean, int, long, float, doubleFloat64double
bytes, string, fixed, enumStringbytes или string *
bytes, string, fixedFixedString(N)fixed(N)
enumEnum(8\16)enum
array(T)Array(T)array(T)
map(V, K)Map(V, K)map(string, K)
union(null, T), union(T, null)Nullable(T)union(null, T)
union(T1, T2, …) **Variant(T1, T2, …)union(T1, T2, …) **
nullNullable(Nothing)null
int (date) ***Date, Date32int (date) ***
long (timestamp-millis) ***DateTime64(3)long (timestamp-millis) ***
long (timestamp-micros) ***DateTime64(6)long (timestamp-micros) ***
bytes (decimal) ***DateTime64(N)bytes (decimal) ***
intIPv4int
fixed(16)IPv6fixed(16)
bytes (decimal) ***Decimal(P, S)bytes (decimal) ***
string (uuid) ***UUIDstring (uuid) ***
fixed(16)Int128/UInt128fixed(16)
fixed(32)Int256/UInt256fixed(32)
recordTuplerecord

* По умолчанию используется bytes; поведение определяется настройкой output_format_avro_string_column_pattern

** Тип Variant неявно допускает null в качестве значения поля, поэтому, например, Avro union(T1, T2, null) будет преобразован в Variant(T1, T2). В результате при формировании Avro из ClickHouse мы всегда должны включать тип null в набор типов Avro union, так как при выводе схемы мы не знаем, является ли какое-либо значение фактически null.

*** Логические типы Avro

Неподдерживаемые логические типы данных Avro:

  • time-millis
  • time-micros
  • duration

Настройки формата

ПараметрОписаниеЗначение по умолчанию
input_format_avro_allow_missing_fieldsИспользовать ли значение по умолчанию вместо возникновения ошибки, когда поле не найдено в схеме.0
input_format_avro_null_as_defaultИспользовать ли значение по умолчанию вместо возникновения ошибки при вставке значения null в столбец, не допускающий значения null.0
format_avro_schema_registry_urlURL Confluent Schema Registry. Для базовой аутентификации URL-кодированные учетные данные могут быть включены непосредственно в путь URL.
format_avro_schema_registry_connection_timeoutТайм-аут соединения в секундах для HTTP-клиента Schema Registry (используется как для получения схемы, так и для регистрации). Должен быть больше 0 и меньше 600 (10 минут).1
format_avro_schema_registry_send_timeoutТайм-аут отправки в секундах для HTTP-клиента Schema Registry. Должен быть больше 0 и меньше 600 (10 минут).1
format_avro_schema_registry_receive_timeoutТайм-аут получения в секундах для HTTP-клиента Schema Registry. Должен быть больше 0 и меньше 600 (10 минут).1
output_format_avro_confluent_subjectДля вывода: имя subject, под которым схема регистрируется в Schema Registry. Обязательно при записи.
output_format_avro_string_column_patternДля вывода: регулярное выражение для столбцов типа String, которые нужно сериализовать как Avro string (по умолчанию — bytes).

Примеры

Чтение из Kafka

Чтобы прочитать Kafka-топик, закодированный в Avro, с помощью движка таблиц Kafka, используйте настройку format_avro_schema_registry_url для указания URL-адреса реестра схем.

CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url';

SELECT * FROM topic1_stream;

Запись в Kafka

Чтобы записывать сообщения AvroConfluent в Kafka-топик, укажите URL-адрес реестра схем и имя subject. При первой записи схема автоматически регистрируется в реестре.

CREATE TABLE topic1_sink
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url',
output_format_avro_confluent_subject = 'topic1-value';

INSERT INTO topic1_sink VALUES ('hello', 'world');

Использование базовой аутентификации

Если для вашего реестра схем требуется базовая аутентификация (например, при использовании Confluent Cloud), вы можете указать URL-кодированные учетные данные в настройке format_avro_schema_registry_url.

CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'https://<username>:<password>@schema-registry-url';

Диагностика неполадок

Чтобы отслеживать ход ингестии и отлаживать ошибки потребителя Kafka, вы можете выполнить запрос к системной таблице system.kafka_consumers. Если в вашем развертывании несколько реплик (например, ClickHouse Cloud), необходимо использовать табличную функцию clusterAllReplicas.

SELECT * FROM clusterAllReplicas('default',system.kafka_consumers)
ORDER BY assignments.partition_id ASC;

Если вы столкнулись с проблемами с разрешением схемы, вы можете использовать kafkacat вместе с clickhouse-local для диагностики:

$ kafkacat -b kafka-broker  -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local   --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String"  -q 'select *  from table'
1 a
2 b
3 c