Kafka Connector Tutorial

Introduction

The Kafka Connector for openLooKeng allows access to live topic data from Apache Kafka using openLooKeng. This tutorial shows how to set up topics and how to create the topic description files that back openLooKeng tables.

Installation

This tutorial assumes familiarity with openLooKeng and a working local openLooKeng installation (see deployment. It will focus on setting up Apache Kafka and integrating it with openLooKeng.

Step 1: Install Apache Kafka

Download and extract Apache Kafka.

Note

This tutorial was tested with Apache Kafka 0.8.1. It should work with any 0.8.x version of Apache Kafka.

Start ZooKeeper and the Kafka server:

  1. $ bin/zookeeper-server-start.sh config/zookeeper.properties
  2. [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
  3. ...
  1. $ bin/kafka-server-start.sh config/server.properties
  2. [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
  3. [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
  4. ...

This will start Zookeeper on port 2181 and Kafka on port 9092.

Step 2: Load data

Download the tpch-kafka loader from Maven central:

  1. $ curl -o kafka-tpch https://repo1.maven.org/maven2/de/softwareforge/kafka_tpch_0811/1.0/kafka_tpch_0811-1.0.sh
  2. $ chmod 755 kafka-tpch

Now run the kafka-tpch program to preload a number of topics with tpch data:

  1. $ ./kafka-tpch load --brokers localhost:9092 --prefix tpch. --tpch-type tiny
  2. 2014-07-28T17:17:07.594-0700 INFO main io.airlift.log.Logging Logging to stderr
  3. 2014-07-28T17:17:07.623-0700 INFO main de.softwareforge.kafka.LoadCommand Processing tables: [customer, orders, lineitem, part, partsupp, supplier, nation, region]
  4. 2014-07-28T17:17:07.981-0700 INFO pool-1-thread-1 de.softwareforge.kafka.LoadCommand Loading table 'customer' into topic 'tpch.customer'...
  5. 2014-07-28T17:17:07.981-0700 INFO pool-1-thread-2 de.softwareforge.kafka.LoadCommand Loading table 'orders' into topic 'tpch.orders'...
  6. 2014-07-28T17:17:07.981-0700 INFO pool-1-thread-3 de.softwareforge.kafka.LoadCommand Loading table 'lineitem' into topic 'tpch.lineitem'...
  7. 2014-07-28T17:17:07.982-0700 INFO pool-1-thread-4 de.softwareforge.kafka.LoadCommand Loading table 'part' into topic 'tpch.part'...
  8. 2014-07-28T17:17:07.982-0700 INFO pool-1-thread-5 de.softwareforge.kafka.LoadCommand Loading table 'partsupp' into topic 'tpch.partsupp'...
  9. 2014-07-28T17:17:07.982-0700 INFO pool-1-thread-6 de.softwareforge.kafka.LoadCommand Loading table 'supplier' into topic 'tpch.supplier'...
  10. 2014-07-28T17:17:07.982-0700 INFO pool-1-thread-7 de.softwareforge.kafka.LoadCommand Loading table 'nation' into topic 'tpch.nation'...
  11. 2014-07-28T17:17:07.982-0700 INFO pool-1-thread-8 de.softwareforge.kafka.LoadCommand Loading table 'region' into topic 'tpch.region'...
  12. 2014-07-28T17:17:10.612-0700 ERROR pool-1-thread-8 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.region
  13. 2014-07-28T17:17:10.781-0700 INFO pool-1-thread-8 de.softwareforge.kafka.LoadCommand Generated 5 rows for table 'region'.
  14. 2014-07-28T17:17:10.797-0700 ERROR pool-1-thread-3 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.lineitem
  15. 2014-07-28T17:17:10.932-0700 ERROR pool-1-thread-1 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.customer
  16. 2014-07-28T17:17:11.068-0700 ERROR pool-1-thread-2 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.orders
  17. 2014-07-28T17:17:11.200-0700 ERROR pool-1-thread-6 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.supplier
  18. 2014-07-28T17:17:11.319-0700 INFO pool-1-thread-6 de.softwareforge.kafka.LoadCommand Generated 100 rows for table 'supplier'.
  19. 2014-07-28T17:17:11.333-0700 ERROR pool-1-thread-4 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.part
  20. 2014-07-28T17:17:11.466-0700 ERROR pool-1-thread-5 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.partsupp
  21. 2014-07-28T17:17:11.597-0700 ERROR pool-1-thread-7 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.nation
  22. 2014-07-28T17:17:11.706-0700 INFO pool-1-thread-7 de.softwareforge.kafka.LoadCommand Generated 25 rows for table 'nation'.
  23. 2014-07-28T17:17:12.180-0700 INFO pool-1-thread-1 de.softwareforge.kafka.LoadCommand Generated 1500 rows for table 'customer'.
  24. 2014-07-28T17:17:12.251-0700 INFO pool-1-thread-4 de.softwareforge.kafka.LoadCommand Generated 2000 rows for table 'part'.
  25. 2014-07-28T17:17:12.905-0700 INFO pool-1-thread-2 de.softwareforge.kafka.LoadCommand Generated 15000 rows for table 'orders'.
  26. 2014-07-28T17:17:12.919-0700 INFO pool-1-thread-5 de.softwareforge.kafka.LoadCommand Generated 8000 rows for table 'partsupp'.
  27. 2014-07-28T17:17:13.877-0700 INFO pool-1-thread-3 de.softwareforge.kafka.LoadCommand Generated 60175 rows for table 'lineitem'.

Kafka now has a number of topics that are preloaded with data to query.

Step 3: Make the Kafka topics known to openLooKeng

In your openLooKeng installation, add a catalog properties file etc/catalog/kafka.properties for the Kafka connector. This file lists the Kafka nodes and topics:

  1. connector.name=kafka
  2. kafka.nodes=localhost:9092
  3. kafka.table-names=tpch.customer,tpch.orders,tpch.lineitem,tpch.part,tpch.partsupp,tpch.supplier,tpch.nation,tpch.region
  4. kafka.hide-internal-columns=false

Now start openLooKeng:

  1. $ bin/launcher start

Because the Kafka tables all have the tpch. prefix in the configuration, the tables are in the tpch schema. The connector is mounted into the kafka catalog because the properties file is named kafka.properties.

Start the openLooKeng CLI:

  1. $ ./openlk-cli --catalog kafka --schema tpch

List the tables to verify that things are working:

  1. lk:tpch> SHOW TABLES;
  2. Table
  3. ----------
  4. customer
  5. lineitem
  6. nation
  7. orders
  8. part
  9. partsupp
  10. region
  11. supplier
  12. (8 rows)

Step 4: Basic data querying

Kafka data is unstructured and it has no metadata to describe the format of the messages. Without further configuration, the Kafka connector can access the data and map it in raw form but there are no actual columns besides the built-in ones:

  1. lk:tpch> DESCRIBE customer;
  2. Column | Type | Extra | Comment
  3. -------------------+---------+-------+---------------------------------------------
  4. _partition_id | bigint | | Partition Id
  5. _partition_offset | bigint | | Offset for the message within the partition
  6. _segment_start | bigint | | Segment start offset
  7. _segment_end | bigint | | Segment end offset
  8. _segment_count | bigint | | Running message count per segment
  9. _key | varchar | | Key text
  10. _key_corrupt | boolean | | Key data is corrupt
  11. _key_length | bigint | | Total number of key bytes
  12. _message | varchar | | Message text
  13. _message_corrupt | boolean | | Message data is corrupt
  14. _message_length | bigint | | Total number of message bytes
  15. (11 rows)
  16. lk:tpch> SELECT count(*) FROM customer;
  17. _col0
  18. -------
  19. 1500
  20. lk:tpch> SELECT _message FROM customer LIMIT 5;
  21. _message
  22. --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  23. {"rowNumber":1,"customerKey":1,"name":"Customer#000000001","address":"IVhzIApeRb ot,c,E","nationKey":15,"phone":"25-989-741-2988","accountBalance":711.56,"marketSegment":"BUILDING","comment":"to the even, regular platelets. regular, ironic epitaphs nag e"}
  24. {"rowNumber":3,"customerKey":3,"name":"Customer#000000003","address":"MG9kdTD2WBHm","nationKey":1,"phone":"11-719-748-3364","accountBalance":7498.12,"marketSegment":"AUTOMOBILE","comment":" deposits eat slyly ironic, even instructions. express foxes detect slyly. blithel
  25. {"rowNumber":5,"customerKey":5,"name":"Customer#000000005","address":"KvpyuHCplrB84WgAiGV6sYpZq7Tj","nationKey":3,"phone":"13-750-942-6364","accountBalance":794.47,"marketSegment":"HOUSEHOLD","comment":"n accounts will have to unwind. foxes cajole accor"}
  26. {"rowNumber":7,"customerKey":7,"name":"Customer#000000007","address":"TcGe5gaZNgVePxU5kRrvXBfkasDTea","nationKey":18,"phone":"28-190-982-9759","accountBalance":9561.95,"marketSegment":"AUTOMOBILE","comment":"ainst the ironic, express theodolites. express, even pinto bean
  27. {"rowNumber":9,"customerKey":9,"name":"Customer#000000009","address":"xKiAFTjUsCuxfeleNqefumTrjS","nationKey":8,"phone":"18-338-906-3675","accountBalance":8324.07,"marketSegment":"FURNITURE","comment":"r theodolites according to the requests wake thinly excuses: pending
  28. (5 rows)
  29. lk:tpch> SELECT sum(cast(json_extract_scalar(_message, '$.accountBalance') AS double)) FROM customer LIMIT 10;
  30. _col0
  31. ------------
  32. 6681865.59
  33. (1 row)

The data from Kafka can be queried using openLooKeng but it is not yet in actual table shape. The raw data is available through the _message and _key columns but it is not decoded into columns. As the sample data is in JSON format, the json built into openLooKeng can be used to slice the data.

Step 5: Add a topic description file

The Kafka connector supports topic description files to turn raw data into table format. These files are located in the etc/kafka folder in the openLooKeng installation and must end with .json. It is recommended that the file name matches the table name but this is not necessary.

Add the following file as etc/kafka/tpch.customer.json and restart openLooKeng:

  1. {
  2. "tableName": "customer",
  3. "schemaName": "tpch",
  4. "topicName": "tpch.customer",
  5. "key": {
  6. "dataFormat": "raw",
  7. "fields": [
  8. {
  9. "name": "kafka_key",
  10. "dataFormat": "LONG",
  11. "type": "BIGINT",
  12. "hidden": "false"
  13. }
  14. ]
  15. }
  16. }

The customer table now has an additional column: kafka_key.

  1. lk:tpch> DESCRIBE customer;
  2. Column | Type | Extra | Comment
  3. -------------------+---------+-------+---------------------------------------------
  4. kafka_key | bigint | |
  5. _partition_id | bigint | | Partition Id
  6. _partition_offset | bigint | | Offset for the message within the partition
  7. _segment_start | bigint | | Segment start offset
  8. _segment_end | bigint | | Segment end offset
  9. _segment_count | bigint | | Running message count per segment
  10. _key | varchar | | Key text
  11. _key_corrupt | boolean | | Key data is corrupt
  12. _key_length | bigint | | Total number of key bytes
  13. _message | varchar | | Message text
  14. _message_corrupt | boolean | | Message data is corrupt
  15. _message_length | bigint | | Total number of message bytes
  16. (12 rows)
  17. lk:tpch> SELECT kafka_key FROM customer ORDER BY kafka_key LIMIT 10;
  18. kafka_key
  19. -----------
  20. 0
  21. 1
  22. 2
  23. 3
  24. 4
  25. 5
  26. 6
  27. 7
  28. 8
  29. 9
  30. (10 rows)

The topic definition file maps the internal Kafka key (which is a raw long in eight bytes) onto a openLooKeng BIGINT column.

Step 6: Map all the values from the topic message onto columns

Update the etc/kafka/tpch.customer.json file to add fields for the message and restart openLooKeng. As the fields in the message are JSON, it uses the json data format. This is an example where different data formats are used for the key and the message.

  1. {
  2. "tableName": "customer",
  3. "schemaName": "tpch",
  4. "topicName": "tpch.customer",
  5. "key": {
  6. "dataFormat": "raw",
  7. "fields": [
  8. {
  9. "name": "kafka_key",
  10. "dataFormat": "LONG",
  11. "type": "BIGINT",
  12. "hidden": "false"
  13. }
  14. ]
  15. },
  16. "message": {
  17. "dataFormat": "json",
  18. "fields": [
  19. {
  20. "name": "row_number",
  21. "mapping": "rowNumber",
  22. "type": "BIGINT"
  23. },
  24. {
  25. "name": "customer_key",
  26. "mapping": "customerKey",
  27. "type": "BIGINT"
  28. },
  29. {
  30. "name": "name",
  31. "mapping": "name",
  32. "type": "VARCHAR"
  33. },
  34. {
  35. "name": "address",
  36. "mapping": "address",
  37. "type": "VARCHAR"
  38. },
  39. {
  40. "name": "nation_key",
  41. "mapping": "nationKey",
  42. "type": "BIGINT"
  43. },
  44. {
  45. "name": "phone",
  46. "mapping": "phone",
  47. "type": "VARCHAR"
  48. },
  49. {
  50. "name": "account_balance",
  51. "mapping": "accountBalance",
  52. "type": "DOUBLE"
  53. },
  54. {
  55. "name": "market_segment",
  56. "mapping": "marketSegment",
  57. "type": "VARCHAR"
  58. },
  59. {
  60. "name": "comment",
  61. "mapping": "comment",
  62. "type": "VARCHAR"
  63. }
  64. ]
  65. }
  66. }

Now for all the fields in the JSON of the message, columns are defined and the sum query from earlier can operate on the account_balance column directly:

  1. lk:tpch> DESCRIBE customer;
  2. Column | Type | Extra | Comment
  3. -------------------+---------+-------+---------------------------------------------
  4. kafka_key | bigint | |
  5. row_number | bigint | |
  6. customer_key | bigint | |
  7. name | varchar | |
  8. address | varchar | |
  9. nation_key | bigint | |
  10. phone | varchar | |
  11. account_balance | double | |
  12. market_segment | varchar | |
  13. comment | varchar | |
  14. _partition_id | bigint | | Partition Id
  15. _partition_offset | bigint | | Offset for the message within the partition
  16. _segment_start | bigint | | Segment start offset
  17. _segment_end | bigint | | Segment end offset
  18. _segment_count | bigint | | Running message count per segment
  19. _key | varchar | | Key text
  20. _key_corrupt | boolean | | Key data is corrupt
  21. _key_length | bigint | | Total number of key bytes
  22. _message | varchar | | Message text
  23. _message_corrupt | boolean | | Message data is corrupt
  24. _message_length | bigint | | Total number of message bytes
  25. (21 rows)
  26. lk:tpch> SELECT * FROM customer LIMIT 5;
  27. kafka_key | row_number | customer_key | name | address | nation_key | phone | account_balance | market_segment | comment
  28. -----------+------------+--------------+--------------------+---------------------------------------+------------+-----------------+-----------------+----------------+---------------------------------------------------------------------------------------------------------
  29. 1 | 2 | 2 | Customer#000000002 | XSTf4,NCwDVaWNe6tEgvwfmRchLXak | 13 | 23-768-687-3665 | 121.65 | AUTOMOBILE | l accounts. blithely ironic theodolites integrate boldly: caref
  30. 3 | 4 | 4 | Customer#000000004 | XxVSJsLAGtn | 4 | 14-128-190-5944 | 2866.83 | MACHINERY | requests. final, regular ideas sleep final accou
  31. 5 | 6 | 6 | Customer#000000006 | sKZz0CsnMD7mp4Xd0YrBvx,LREYKUWAh yVn | 20 | 30-114-968-4951 | 7638.57 | AUTOMOBILE | tions. even deposits boost according to the slyly bold packages. final accounts cajole requests. furious
  32. 7 | 8 | 8 | Customer#000000008 | I0B10bB0AymmC, 0PrRYBCP1yGJ8xcBPmWhl5 | 17 | 27-147-574-9335 | 6819.74 | BUILDING | among the slyly regular theodolites kindle blithely courts. carefully even theodolites haggle slyly alon
  33. 9 | 10 | 10 | Customer#000000010 | 6LrEaV6KR6PLVcgl2ArL Q3rqzLzcT1 v2 | 5 | 15-741-346-9870 | 2753.54 | HOUSEHOLD | es regular deposits haggle. fur
  34. (5 rows)
  35. lk:tpch> SELECT sum(account_balance) FROM customer LIMIT 10;
  36. _col0
  37. ------------
  38. 6681865.59
  39. (1 row)

Now all the fields from the customer topic messages are available as openLooKeng table columns.

Step 7: Use live data

openLooKeng can query live data in Kafka as it arrives. To simulate a live feed of data, this tutorial sets up a feed of live tweets into Kafka.

Setup a live Twitter feed

  • Download the twistr tool
  1. $ curl -o twistr https://repo1.maven.org/maven2/de/softwareforge/twistr_kafka_0811/1.2/twistr_kafka_0811-1.2.sh
  2. $ chmod 755 twistr
  • Create a developer account at https://dev.twitter.com/ and set up an access and consumer token.
  • Create a twistr.properties file and put the access and consumer key and secrets into it:
  1. twistr.access-token-key=...
  2. twistr.access-token-secret=...
  3. twistr.consumer-key=...
  4. twistr.consumer-secret=...
  5. twistr.kafka.brokers=localhost:9092

Create a tweets table on openLooKeng

Add the tweets table to the etc/catalog/kafka.properties file:

  1. connector.name=kafka
  2. kafka.nodes=localhost:9092
  3. kafka.table-names=tpch.customer,tpch.orders,tpch.lineitem,tpch.part,tpch.partsupp,tpch.supplier,tpch.nation,tpch.region,tweets
  4. kafka.hide-internal-columns=false

Add a topic definition file for the Twitter feed as etc/kafka/tweets.json:

  1. {
  2. "tableName": "tweets",
  3. "topicName": "twitter_feed",
  4. "dataFormat": "json",
  5. "key": {
  6. "dataFormat": "raw",
  7. "fields": [
  8. {
  9. "name": "kafka_key",
  10. "dataFormat": "LONG",
  11. "type": "BIGINT",
  12. "hidden": "false"
  13. }
  14. ]
  15. },
  16. "message": {
  17. "dataFormat":"json",
  18. "fields": [
  19. {
  20. "name": "text",
  21. "mapping": "text",
  22. "type": "VARCHAR"
  23. },
  24. {
  25. "name": "user_name",
  26. "mapping": "user/screen_name",
  27. "type": "VARCHAR"
  28. },
  29. {
  30. "name": "lang",
  31. "mapping": "lang",
  32. "type": "VARCHAR"
  33. },
  34. {
  35. "name": "created_at",
  36. "mapping": "created_at",
  37. "type": "TIMESTAMP",
  38. "dataFormat": "rfc2822"
  39. },
  40. {
  41. "name": "favorite_count",
  42. "mapping": "favorite_count",
  43. "type": "BIGINT"
  44. },
  45. {
  46. "name": "retweet_count",
  47. "mapping": "retweet_count",
  48. "type": "BIGINT"
  49. },
  50. {
  51. "name": "favorited",
  52. "mapping": "favorited",
  53. "type": "BOOLEAN"
  54. },
  55. {
  56. "name": "id",
  57. "mapping": "id_str",
  58. "type": "VARCHAR"
  59. },
  60. {
  61. "name": "in_reply_to_screen_name",
  62. "mapping": "in_reply_to_screen_name",
  63. "type": "VARCHAR"
  64. },
  65. {
  66. "name": "place_name",
  67. "mapping": "place/full_name",
  68. "type": "VARCHAR"
  69. }
  70. ]
  71. }
  72. }

As this table does not have an explicit schema name, it will be placed into the default schema.

Feed live data

Start the twistr tool:

  1. $ java -Dness.config.location=file:$(pwd) -Dness.config=twistr -jar ./twistr

twistr connects to the Twitter API and feeds the “sample tweet” feed into a Kafka topic called twitter_feed.

Now run queries against live data:

  1. $ ./openlk-cli --catalog kafka --schema default
  2. lk:default> SELECT count(*) FROM tweets;
  3. _col0
  4. -------
  5. 4467
  6. (1 row)
  7. lk:default> SELECT count(*) FROM tweets;
  8. _col0
  9. -------
  10. 4517
  11. (1 row)
  12. lk:default> SELECT count(*) FROM tweets;
  13. _col0
  14. -------
  15. 4572
  16. (1 row)
  17. lk:default> SELECT kafka_key, user_name, lang, created_at FROM tweets LIMIT 10;
  18. kafka_key | user_name | lang | created_at
  19. --------------------+-----------------+------+-------------------------
  20. 494227746231685121 | burncaniff | en | 2014-07-29 14:07:31.000
  21. 494227746214535169 | gu8tn | ja | 2014-07-29 14:07:31.000
  22. 494227746219126785 | pequitamedicen | es | 2014-07-29 14:07:31.000
  23. 494227746201931777 | josnyS | ht | 2014-07-29 14:07:31.000
  24. 494227746219110401 | Cafe510 | en | 2014-07-29 14:07:31.000
  25. 494227746210332673 | Da_JuanAnd_Only | en | 2014-07-29 14:07:31.000
  26. 494227746193956865 | Smile_Kidrauhl6 | pt | 2014-07-29 14:07:31.000
  27. 494227750426017793 | CashforeverCD | en | 2014-07-29 14:07:32.000
  28. 494227750396653569 | FilmArsivimiz | tr | 2014-07-29 14:07:32.000
  29. 494227750388256769 | jmolas | es | 2014-07-29 14:07:32.000
  30. (10 rows)

There is now a live feed into Kafka which can be queried using openLooKeng.

Epilogue: Time stamps

The tweets feed that was set up in the last step contains a time stamp in RFC 2822 format as created_at attribute in each tweet.

  1. lk:default> SELECT DISTINCT json_extract_scalar(_message, '$.created_at')) AS raw_date
  2. -> FROM tweets LIMIT 5;
  3. raw_date
  4. --------------------------------
  5. Tue Jul 29 21:07:31 +0000 2014
  6. Tue Jul 29 21:07:32 +0000 2014
  7. Tue Jul 29 21:07:33 +0000 2014
  8. Tue Jul 29 21:07:34 +0000 2014
  9. Tue Jul 29 21:07:35 +0000 2014
  10. (5 rows)

The topic definition file for the tweets table contains a mapping onto a timestamp using the rfc2822 converter:

  1. ...
  2. {
  3. "name": "created_at",
  4. "mapping": "created_at",
  5. "type": "TIMESTAMP",
  6. "dataFormat": "rfc2822"
  7. },
  8. ...

This allows the raw data to be mapped onto a openLooKeng timestamp column:

  1. lk:default> SELECT created_at, raw_date FROM (
  2. -> SELECT created_at, json_extract_scalar(_message, '$.created_at') AS raw_date
  3. -> FROM tweets)
  4. -> GROUP BY 1, 2 LIMIT 5;
  5. created_at | raw_date
  6. -------------------------+--------------------------------
  7. 2014-07-29 14:07:20.000 | Tue Jul 29 21:07:20 +0000 2014
  8. 2014-07-29 14:07:21.000 | Tue Jul 29 21:07:21 +0000 2014
  9. 2014-07-29 14:07:22.000 | Tue Jul 29 21:07:22 +0000 2014
  10. 2014-07-29 14:07:23.000 | Tue Jul 29 21:07:23 +0000 2014
  11. 2014-07-29 14:07:24.000 | Tue Jul 29 21:07:24 +0000 2014
  12. (5 rows)

The Kafka connector contains converters for ISO 8601, RFC 2822 text formats and for number-based timestamps using seconds or miilliseconds since the epoch. There is also a generic, text-based formatter which uses Joda-Time format strings to parse text columns.