Extracting Metadata From Topics
Reading Apache Kafka Data
Reads in Apache Kafka don't have a natural stopping point. To avoid perpetual read operations, items are read until the ReadDuration or Timeout expires. ReadDuration is set to 30 seconds by default.
Data Format Resolution
Kafka stores message data as binary blobs without any metadata that specifies encoding. This goes for both messages values (SerializationFormat) as well as message keys (MessageKeyType). The provider uses both these properties and TypeDetectionScheme to determine how to decode data from Kafka.
The same rules apply to both SerializationFormat and MessageKeyType. The following list covers only SerializationFormat for brevity:
- TypeDetectionScheme=MessageOnly has the highest priority.
- If SerializationFormat is set to a binary type (AUTO, NONE/Binary, Avro) the value is reported as a base64-encoded string.
- If SerializationFormat is set to a numeric type (Long, Integer, Float, Double) the value is reported as a number of that type.
- The value is decoded as a string. The encoding is assumed to be UTF-8 but you can set Charset to override this.
- The primitive types (NONE/Binary, String, Long, Integer, Float, Double) have the next highest priority. If SerializationFormat is set to one of these types, the provider does not rowscan or use the schema registry to determine the type. Instead the value is decoded into a single column of the given type.
- TypeDetectionScheme=SchemaRegistry has the next highest priority.
- If SerializationFormat is set to a primitive type (NONE/Binary, String, Long, Integer, Float, Double), the schema stored in the registry is not used.
- The format of the registry schema is checked next. A registry schema typically includes its format as part of its definition. Even in cases where it does not, the provider can usually detect what format a schema is designed for.
- SerializationFormat is used as a fallback if the schema definition does not specify a format.
- The provider assumes the format is Avro.
- TypeDetectionScheme=None and TypeDetectionScheme=RowScan have the lowest priority.
- If SerializationFormat is set to a primitive type (NONE/Binary, String, Long, Integer, Float, Double), no row scan is performed.
- The format specified in SerializationFormat is used if it is not AUTO.
- The provider uses a heuristic based on the start of the message to determine the appropriate format.
- The provider assumes the format is CSV.
Note that the provider uses the same TypeDetectionScheme for both keys and values. For example, this means that you cannot use the rowscan for resolving the key format while also using the schema registry to resolve the value format. Key and value formats are independent apart from this restriction. The provider supports configurations like the following, which reads from topics that contain 4-byte integer keys and Avro-encoded values.
- TypeDetectionScheme=SchemaRegistry
- MessageKeyType=Integer
- SerializationFormat=Avro
Schema Registry
Set the following to connect to a service with a schema registry:
- BootstrapServers: The server (hostname or IP address) and port (in the format server:port) of the Apache Kafka BoostrapServers.
- TypeDetectionScheme: Set to SchemaRegistry.
- RegistryAuthScheme: Set to the appropriate authentication method, see the next sections for details.
- RegistryService: The schema registry service used to read topic schemas. The options are Confluent and AWSGlue.
- RegistryUrl: Set to the server for the schema registry.
The schema registry contains a list of topics which have registered schemas. The list of tables and columns are read from the schema registry and validated against Kafka. The provider only surfaces topics that have both a schema in the registry and exist in Kafka.
Confluent Schema Registry
When you connect to Confluent Cloud, the RegistryUrl corresponds to the Schema Registry endpoint value in Schemas -> Schema Registry -> Instructions.
The Confluent schema registry supports several authentication options. Confluent Cloud deployments will typically require RegistryAuthScheme to be set to Basic, along with a RegistryUser and RegistryPassword. These can be found by navigating to Schemas > Schema Registry > API Access and finding the access key and secret key values.
On-premise deployments may not require authentication, in these configurations RegistryAuthScheme should be set to None. They may also require SSL client certificates, which can be set using the SSLCertificate RegistryAuthScheme along with the RegistryClientCert and RegistryClientCertType options.
Schemas in Confluent schema registries must have an appropriate name for the provider to recognize them. Value schemas must have the -value suffix, while key schemas must have the -key suffix. The provider supports topics that have only value schemas but does not allow topics that have only key schemas.
AWS Glue Schema Registry
When connecting to AWS Glue, the RegistryUrl corresponds to the ARN value of the registry.
The AWS Glue schema registry only supports the Basic RegistryAuthScheme. RegistryUser and RegistryPassword, and should be set to the access key and secret key of a user with access to the registry.
No Schema Registry
Set the following to connect to a service without a schema registry:
- BootstrapServers: The server (hostname or IP address) and port (in the format server:port) of the Apache Kafka BoostrapServers.
- TypeDetectionScheme: Set to RowScan or None.