Skip To Content

Subscribe to a Kafka Topic for Text

The Subscribe to a Kafka Topic for Text Input Connector can be used to retrieve and adapt event data records, formatted as delimited text, from an Apache Kafka® Topic. The text delimiter is usually a comma, so this type of text data is sometimes referred to as comma separated value data, but ArcGIS GeoEvent Server can use any normal ASCII character as a delimiter to separate data attribute values. For more information about getting started with Apache Kafka®, see Apache Kafka Introduction.

Usage notes

  • Use this input connector to consume data as formatted or delimited text from a Kafka Topic. This input connector is a consumer of Kafka.
  • This input connector pairs the Text Inbound Adapter with the Kafka Inbound Transport.
  • Delimited text does not have to contain data which represents a geometry. Therefore, this input connector can be used to retrieve and process non-spatial data from Kafka.
  • The adapter supports the ability to construct a point geometry from X, Y, and Z attribute fields.
  • A Message Separator and an Attribute Separator are required to parse delimited text from Kafka. The Message Separator indicates the character which identifies the end of a data record; the default is \n (newline). The Attribute Separator specifies the character used to separate one attribute value from another in a single line of text; the default is , (comma). Any normal ASCII character, specified as the character or Unicode value, can be used as a message or attribute separator.
  • The Kafka Inbound Transport supports TLS 1.2 and SASL security protocols for authenticating with a Kafka cluster or broker.

Parameters

ParameterDescription

Name

A descriptive name for the input connector used for reference in GeoEvent Manager.

Override with Custom Kafka Properties

Specify whether to override the default GeoEvent Server Kafkaclient properties. The default is No.

  • Yes—The default Kafka client properties exposed by the transport will be overridden. A folder registered with GeoEvent Server must be specified that contains a Kafka.properties file with the correct formatting for a valid Kafkaconfiguration. See Apache Kafka Configuration for a list of supported configurations and expected formatting for the specified .properties file.
  • No—The default Kafka client properties exposed by the transport will not be overridden. Kafka Bootstrap Servers and Consumer Group ID must be specified.

Kafka Bootstrap Servers

(Conditional)

A list of hostname:port pairs used to establish the initial connection to the Kafka cluster. Hostname:port pairs must be comma-separated, for example:

  • broker0.example.com:9092,broker1.example.com:9092,broker2.example.com:9092

Parameter is shown when Override with Custom Kafka Properties is set to No.

Topic Name(s)

The name of a Kafka topic, or list of Kafka topics, to consume data of interest from. Multiple topics must be separated by a semi-colon.

  • topic1
  • topic1;topic2;topic3;topic4

Note:

The ability to specify multiple Kafka topics is supported in ArcGIS GeoEvent Server 10.8 and later.

Note:

Data order from a specific topic is not guaranteed.

Number of Consumers

Specifies the number of consumers for each consumer group. The number of consumers is set to a default of 1.

Note:

The number of consumers is limited by the number of partitions on the Kafka topic. See Apache Kafka Introduction for more information on consumer instances.

Consumer Group ID

(Conditional)

An optional string that uniquely identifies the consumer group for a set of consumers. Also known as the consumer group name.

If a Consumer Group ID is unspecified, GeoEvent Server will assign a static consumer group ID called geoevent-consumer. This static consumer group ID is shared across all instances of the Kafka connector where the Consumer Group ID is left unspecified.

It is highly recommended a custom Consumer Group ID be specified. Refer to Apache Kafka Introduction for more information about consumer groups.

Parameter is shown when Override with Custom Kafka Properties is set to No.

Registered Folder for the Kafka Properties File

(Conditional)

The folder registered with GeoEvent Server that contains the Kafka .properties file. The Kafka .properties file defines the custom Kafka properties when Override with Custom Kafka Properties is set to Yes. Ensure that the folder registered with GeoEvent Server is the full path to where the Kafka .properties file is located.

Parameter is shown when Override with Custom Kafka Properties is set to Yes.

Kafka Properties File Name

(Conditional)

The name of the Kafka .properties file that contains the custom Kafka properties for client configuration. The name of the file should be specified without the .properties extension.

  • If the name of the custom Kafka .properties file is sample.properties, specify this parameter as sample

Parameter is shown when Override with Custom Kafka Properties is set to Yes.

Start from Beginning

Specifies whether records are consumed from the topic starting at the beginning offset, or from the last offset committed for the consumer. The default is Yes.

  • Yes—Records will be consumed from the topic at the beginning offset.
  • No—Records will be consumed from the last offset committed.

Note:

For more information about offsets, see Apache Kafka Configuration.

Event Separator

A single literal character which indicates the end of an event data record. Unicode values may be used to specify a character delimiter. The character should not be enclosed in quotes. A newline (\n) is a common end-of-record delimiter.

Field Separator

A single literal character used to separate one attribute value from another in a message. Unicode values may be used to specify a character delimiter. The character should not be enclosed in quotes. A comma (,) is a common attribute delimiter.

Create GeoEvent Definition

Specifies whether a new or existing GeoEvent Definition should be used for the inbound event data. A GeoEvent Definition is required for GeoEvent Server to understand the inbound event data attribute fields and data types.

  • Yes—A new GeoEvent Definition will be created based on the schema of the first event record received.
  • No—A new GeoEvent Definition will not be created. Select an existing GeoEvent Definition that matches the schema of the inbound event data.

GeoEvent Definition Name (New)

(Conditional)

The name assigned to a new GeoEvent Definition. If a GeoEvent Definition with the specified name already exists, the existing GeoEvent Definition will be used. The first data record received will be used to determine the expected schema of subsequent data records, a new GeoEvent Definition will be created based on that first data record's schema.

Parameter is shown when Create GeoEvent Definition is set to Yes and is hidden when set to No.

GeoEvent Definition Name (Existing)

(Conditional)

The name of an existing GeoEvent Definition to use when adapting received data to create event data for processing by a GeoEvent Service.

Parameter is shown when Create GeoEvent Definition is set to No and is hidden when set to Yes.

Construct Geometry from Fields

Specifies whether the input connector should construct a point geometry using coordinate values received as attributes. The default is No.

  • Yes—Values from specified event attribute fields will be used to construct a point geometry.
  • No—A point geometry will not be constructed. It is assumed an attribute field contains a value which can be interpreted as a geometry or the event record is nonspatial (does not have a geometry).

X Geometry Field

(Conditional)

The attribute field in the inbound event data containing the X coordinate part (for example horizontal or longitude) of a point location.

Parameter is shown when Construct Geometry from Fields is set to Yes and is hidden when set to No.

Y Geometry Field

(Conditional)

The attribute field in the inbound event data containing the Y coordinate part (for example vertical or latitude) of a point location.

Parameter is shown when Construct Geometry from Fields is set to Yes and is hidden when set to No.

Z Geometry Field

(Conditional)

The name of the field in the inbound event data containing the Z coordinate part (for example depth or altitude) of a point location. If left blank, the Z value will be omitted and a 2D point geometry will be constructed.

Parameter is shown when Construct Geometry from Fields is set to Yes and is hidden when set to No.

Default Spatial Reference

The well-known ID (WKID) of a spatial reference to be used when a geometry is constructed from attribute field values whose coordinates are not latitude and longitude values for an assumed WGS84 geographic coordinate system, or geometry strings are received that do not include a spatial reference. A well-known text (WKT) value or the name of an attribute field containing the WKID or WKT may also be specified.

Expected Date Format

The pattern used to match expected string representations of date/time values and convert them to Java Date values. The pattern's format follows the Java SimpleDateFormat class convention. This parameter has no default value.

While GeoEvent Server prefers date/time values to be expressed in the ISO 8601 standard, several string representations of date/time values commonly recognized as date values can be converted to Java Date values without specifying an Expected Date Format pattern. These include:

  • "2019-12-31T23:59:59"—The ISO 8601 standard format
  • 1577836799000—Java Date (epoch long integer; UTC)
  • "Tue Dec 31 23:59:59 -0000 2019"—A common web services string format
  • "12/31/2019 11:59:59 PM"—Common format used in the United States (12-hour clock)
  • "12/31/2019 23:59:59"—Common format used in the United States (24-hour clock)

If the date/time values received are expressed using a convention other than one of the five shown above, you will have to specify an Expected Date Format so GeoEvent Server knows how the date/time values should be adapted.

Language for Number Formatting

The locale identifier (ID) used for locale-sensitive behavior when formatting numbers from data values. The default is the locale of the machine GeoEvent Server is installed on. For more information, see Java Supported Locales.

Authentication Required

Specifies whether the connection to the Kafka cluster, or Kafka broker, requires authentication. The default is No.

  • Yes—Authentication is required to the Kafka cluster or broker.
  • No—Authentication is not required to connect to the Kafka cluster or broker.

Authenticate Using

(Conditional)

Specifies the security protocol that is used to secure the Kafka cluster. Available security protocols include TLS 1.2 and SASL.

  • TLS 1.2—The security protocol used by the Kafka cluster is TLS 1.2. Ensure that the Kafka cluster's PKI file (x509 certificate) is imported into the trust store of the ArcGIS Server with which ArcGIS GeoEvent Server is configured. For details, see the Import the certificate into ArcGIS Server section in Configure ArcGIS Server with an existing CA-signed certificate for specific instructions on importing certificates.
  • SASL—The security protocol used by the Kafka cluster is SASL. Only SASL SSL and Kerberos are supported.

Note:

When using Kerberos, ensure the operating system user account running ArcGIS GeoEvent Server has read-access to the keytab file in the Kerberos setup/configuration.

Parameter is shown when Authentication Required is set to Yes.

Registered Folder for Credential File

(Conditional)

The folder registered with GeoEvent Server that contains the Kafka cluster's PKI file (x509 certificate). Ensure that the folder registered with GeoEvent Server is the full path to where the Kafka cluster's certificate is located.

Parameter is shown when Authentication Required is set to Yes. This parameter is applicable to TLS 1.2 only.

Credential Configuration File

(Conditional)

The name of the Kafka cluster's PKI file (x509 certificate). The certificate and its associated private key must be stored in the PKCS#12 format, which is represented by a file with either the .p12 or .pfx extension. Provide the name of the file in addition to the extension.

  • my_kafka_certificate.pfx
  • my_other_kafka_certificate.p12

Note:

Only the certificate file name and extension are supported for this parameter. Relative paths to the certificate should not be specified in this parameter. Register the full path to the certificate file using the Registered Folder for Credential File parameter.

Parameter is shown when Authentication Required is set to Yes. Properly is applicable to TLS 1.2 only.

Keystore Password

(Conditional)

The password for the Kafka cluster's PKI file (x509 certificate). Also known as the certificate's private key.

Parameter is shown when Authentication Required is set to Yes. This parameter is applicable to TLS 1.2 only.

SASL Authentication Type

(Conditional)

Specifies the type of SASL authentication mechanism supported by the Kafka cluster. Available SASL authentication types include SASL GSSAPI (Kerberos) and SASL PLAIN.

  • SASL/GSSAPI (Kerberos)—The Kafka cluster uses SASL/GSSAPI Kerberos authentication.
  • SASL/PLAIN—The Kafka cluster uses SASL/PLAIN authentication.

Parameter is shown when Authentication Required is set to Yes. This parameter is applicable to SASL only.

Kerberos Principal

(Conditional)

The Kerberos principal for the specific user. For example:

  • GeoEventKafkaClient1@example.com.

Parameter is shown when Authentication Required is set to Yes. This parameter is applicable to SASL/GSSAPI (Kerberos) only.

Use Key Tab

(Conditional)

Indicates whether to use the keytab in the Kerberos settings. The default is Yes.

  • Yes—The keytab will be used in the Kerberos settings.
  • No—The keytab will not be used in the Kerberos settings.

Parameter is shown when Authentication Required is set to Yes. This parameter is applicable to SASL/GSSAPI (Kerberos) only.

Store Key

(Conditional)

Indicates whether to store the key in the Kerberos settings. The default is Yes.

  • Yes—The key will be stored in the Kerberos settings.
  • No—The key will not be stored in the Kerberos settings.

Parameter is shown when Authentication Required is set to Yes. This parameter is applicable to SASL/GSSAPI (Kerberos) only.

Username

(Conditional)

Specifies the username used to authenticate with the Kafka cluster. Also known as a connection string with certain cloud providers. Refer to the documentation of the chosen cloud provider for correct syntax.

Parameter is shown when Authentication Required is set to Yes. This parameter is applicable to SASL/PLAIN only.

Password

(Conditional)

Specifies the password used to authenticate with the Kafka cluster. Refer to the documentation of the chosen cloud provider for the correct syntax.

Parameter is shown when Authentication Required is set to Yes. This parameter is applicable to SASL/PLAIN only.

Considerations and limitations

  • Insufficiently managing and optimizing consumers will result in certain instances of the Subscribe to a Kafka Topic for Text input connector not retrieving any data. The number of consumers, in a consumer group, is limited by the number of partitions on a Kafka topic. If the number of consumers in a consumer group exceeds the number of partitions available on a Kafka topic, the excess consumers will not consume data. Consider optimizing the number of consumers to best align with the number of partitions on the Kafka topic or implement different consumer groups for each connector to avoid this. For more information on consumers and consumer groups, see the Kafka Documentation.
  • The Subscribe to a Kafka Topic for Text input connector is a client consumer of Kafka. Apply the same considerations to this input connector as would be required for any other client consumer of Kafka. For example, if this input connector is not receiving any data from a Kafka topic but a separate client consumer of Kafka is, consider the factors that are involved with having two client consumers. This includes but is not limited to the configured consumer group ID, number of partitions available on the topic, and the number of existing consumers. Alternatively, if the input connector is stopped and started in quick succession, consider the implications for Kafka from a consumer point-of-view. A rebalancing of the Kafka topic's partitions may prevent the input connector from immediately rejoining as a consumer under the same consumer group.