How can I use the new JSON Data Type with Kafka?
Kafka and the JSON Data Type
With the introduction of the new JSON
data type, ClickHouse is now a good choice of database for doing JSON analytics.
In this guide, we're going to learn how to load JSON messages from Apache Kafka directly into a single JSON
column in ClickHouse.
Setup Kafka
Let's start by running a Kafka broker on our machine. We're also going to map port 9092 to port 9092 on our host operating system so that it's easier to interact with Kafka:
docker run --name broker -p 9092:9092 apache/kafka:3.8.1
Ingest data into Kafka
Once that's running, we need to ingest some data.
The Wikimedia recent changes feed is a good source of streaming data, so let's ingest that into the wiki_events
topic:
curl -N https://stream.wikimedia.org/v2/stream/recentchange 2>/dev/null |
awk '/^data: /{gsub(/^data: /, ""); print}' |
jq -cr --arg sep ø '[.meta.id, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t wiki_events -Kø
We can check that the data is being ingested by running the following command:
kcat -C -b localhost:9092 -t wiki_events
{"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://www.wikidata.org/wiki/Q130972321","request_id":"5c687ded-4721-4bfc-ae6c-58ca25f4a6ce","id":"0fbb0982-c43b-4e8b-989b-db7e78dbdc76","dt":"2024-11-06T11:59:57Z","domain":"www.wikidata.org","stream":"mediawiki.recentchange","topic":"codfw.mediawiki.recentchange","partition":0,"offset":1228777205},"id":2338656448,"type":"edit","namespace":0,"title":"Q130972321","title_url":"https://www.wikidata.org/wiki/Q130972321","comment":"/* wbsetclaim-create:2||1 */ [[Property:P18]]: Mahdi Rrezaei Journalist.jpg","timestamp":1730894397,"user":"Wikimellatir","bot":false,"notify_url":"https://www.wikidata.org/w/index.php?diff=2270885254&oldid=2270870214&rcid=2338656448","minor":false,"patrolled":false,"length":{"old":4269,"new":4636},"revision":{"old":2270870214,"new":2270885254},"server_url":"https://www.wikidata.org","server_name":"www.wikidata.org","server_script_path":"/w","wiki":"wikidatawiki","parsedcomment":"<span dir=\"auto\"><span class=\"autocomment\">Created claim: </span></span> <a href=\"/wiki/Property:P18\" title=\"image | image of relevant illustration of the subject; if available, also use more specific properties (sample: coat of arms image, locator map, flag image, signature image, logo image, collage image)\"><span class=\"wb-itemlink\"><span class=\"wb-itemlink-label\" lang=\"en\" dir=\"ltr\">image</span> <span class=\"wb-itemlink-id\">(P18)</span></span></a>: Mahdi Rrezaei Journalist.jpg"}
{"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://www.wikidata.org/wiki/Q75756596","request_id":"eb116219-7372-4725-986f-790211708d36","id":"9e0d5299-5bd1-4c58-b796-9852afd8a84e","dt":"2024-11-06T11:59:54Z","domain":"www.wikidata.org","stream":"mediawiki.recentchange","topic":"codfw.mediawiki.recentchange","partition":0,"offset":1228777206},"id":2338656449,"type":"edit","namespace":0,"title":"Q75756596","title_url":"https://www.wikidata.org/wiki/Q75756596","comment":"/* wbeditentity-update-languages-and-other:0||55 */ mv labels and aliases matching [[Property:P528]] or [[Property:P3083]] to mul","timestamp":1730894394,"user":"Twofivesixbot","bot":true,"notify_url":"https://www.wikidata.org/w/index.php?diff=2270885237&oldid=2147709089&rcid=2338656449","minor":false,"patrolled":true,"length":{"old":30879,"new":27161},"revision":{"old":2147709089,"new":2270885237},"server_url":"https://www.wikidata.org","server_name":"www.wikidata.org","server_script_path":"/w","wiki":"wikidatawiki","parsedcomment":"<span dir=\"auto\"><span class=\"autocomment\">Changed label, description and/or aliases in 55 languages, and other parts: </span></span> mv labels and aliases matching <a href=\"/wiki/Property:P528\" title=\"catalog code | catalog name of an object, use with qualifier P972\"><span class=\"wb-itemlink\"><span class=\"wb-itemlink-label\" lang=\"en\" dir=\"ltr\">catalog code</span> <span class=\"wb-itemlink-id\">(P528)</span></span></a> or <a href=\"/wiki/Property:P3083\" title=\"SIMBAD ID | identifier for an astronomical object, in the University of Strasbourg's SIMBAD database\"><span class=\"wb-itemlink\"><span class=\"wb-itemlink-label\" lang=\"en\" dir=\"ltr\">SIMBAD ID</span> <span class=\"wb-itemlink-id\">(P3083)</span></span></a> to mul"}
So far, so good.
Ingest data into ClickHouse
Next, we're going to ingest the data into ClickHouse. First, let's enable the JSON type (which is currently experimental), by setting the following property:
SET allow_experimental_json_type = 1;
Now, we'll create the wiki_queue
table, which uses the Kafka
table engine.
CREATE TABLE wiki_queue
(
json JSON
)
ENGINE = Kafka(
'localhost:9092',
'wiki_events',
'clickhouse-consumer-group',
'JSONAsObject'
);
Note that we're using the JSONAsObject
format, which will ensure that incoming messages are made available as a JSON object.
This format can only be parsed into a table that has a single column with the JSON
type.
Next, we'll create the underlying table to store the Wiki data:
CREATE TABLE wiki
(
json JSON,
id String MATERIALIZED getSubcolumn(json, 'meta.id')
)
ENGINE = MergeTree
ORDER BY id;
Finally, let's create a materialized view to populate the wiki
table:
CREATE MATERIALIZED VIEW wiki_mv TO wiki AS
SELECT json
FROM wiki_queue;
Querying JSON data in ClickHouse
We can then write queries against the wiki
table.
For example, we could count the number of bots that have committed changes:
SELECT json.bot, count()
FROM wiki
GROUP BY ALL
┌─json.bot─┬─count()─┐
1. │ true │ 2526 │
2. │ false │ 4691 │
└──────────┴─────────┘
Or we could find out the users that make the most changes on en.wikipedia.org
:
SELECT
json.user,
count()
FROM wiki
WHERE json.server_name = 'en.wikipedia.org'
GROUP BY ALL
ORDER BY count() DESC
LIMIT 10
┌─json.user──────────────────────────────┬─count()─┐
1. │ Monkbot │ 267 │
2. │ Onel5969 │ 107 │
3. │ Bangwiki │ 37 │
4. │ HHH Pedrigree │ 28 │
5. │ REDACTED403 │ 23 │
6. │ KylieTastic │ 22 │
7. │ Tinniesbison │ 21 │
8. │ XTheBedrockX │ 20 │
9. │ 2001:4455:1DB:4000:51F3:6A16:408E:69FC │ 19 │
10. │ Wcquidditch │ 15 │
└────────────────────────────────────────┴─────────┘