Skip to content

Sample Flink Query

Create table from Kinesis Data Streams

%flink.bsql

create table <table name> (
    `timestamp` TIMESTAMP(3),
    `id` VARCHAR,
    `code` INTEGER,
    WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '10' SECOND
) WITH (
    'connector' = 'kinesis',
    'stream' = '<stream name>',
    'aws.region' = '<region code>',
    'scan.stream.initpos' = 'LATEST',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'
);

Create table from Kafka

%flink.bsql

create table sensor (
    `timestamp` TIMESTAMP(3),
    `id` VARCHAR,
    `code` INTEGER,
    WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '10' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'sensor',
    'properties.bootstrap.servers' = '<host:port>,<host:port>',
    'properties.security.protocol' = 'SSL',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601',
    'scan.startup.mode' = 'latest-offset'
);

Get datas using sliding window

%flink.ssql(type=update)

SELECT
    `window_start`, `window_end`, `id`, COUNT(*) AS `id`, COUNT(CASE WHEN `code` >= 400 AND `code` < 500 THEN 1 END) AS count_4xx, COUNT(CASE WHEN `code` >= 500 THEN 1 END) AS count_5xx
FROM TABLE (
    HOP (TABLE `<table name>`, DESCRIPTOR (`timestamp`), INTERVAL '1' MINUTES, INTERVAL '3' MINUTES
    )
)
GROUP BY
    window_start, window_end;

Change timezone in sliding window

%flink.ssql(type=update)

SELECT
    CONVERT_TZ(DATE_FORMAT(`window_start`, 'yyyy-MM-dd HH:mm:00'), 'UTC', 'Europe/Amsterdam') AS window_start,
    CONVERT_TZ(DATE_FORMAT(`window_end`, 'yyyy-MM-dd HH:mm:00'), 'UTC', 'Europe/Amsterdam') AS window_end,
FROM TABLE (
    HOP (TABLE `<table name>`, DESCRIPTOR (`timestamp`), INTERVAL '1' MINUTES, INTERVAL '3' MINUTES
    )
)
GROUP BY
    window_start, window_end;