You can get useful information from a table of Kafka data by running typical queries,
        such as counting the number of records streamed within an interval of time or defining a
        view of streamed data over a period of time.
        This task requires Kafka 0.11 or later to support time-based lookups and prevent full stream scans.
        This task assumes you created a table named kafka_table for a Kafka
            stream.
        - 
                List the table properties and all the partition or offset information for the topic.
                DESCRIBE EXTENDED kafka_table;
 
- 
                Count the number of Kafka records that have timestamps within the past 10
                    minutes.
                
                    SELECT COUNT(*) FROM kafka_table 
  WHERE `__timestamp` >  1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '10' MINUTES);  
 
 Such a time-based seek requires Kafka 0.11 or later, which has a Kafka broker
                    that supports time-based lookups; otherwise, this query leads to a full stream
                    scan. 
- 
                Define a view of data consumed within the past 15 minutes and mask specific columns.
                CREATE VIEW last_15_minutes_of_kafka_table AS SELECT  `timestamp`, `user`, delta, 
  ADDED FROM kafka_table 
  WHERE `__timestamp` >  1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '15' MINUTES) ; 
 
- 
                Create a dimension table.
                CREATE TABLE user_table (`user` string, `first_name` string , age int, gender string, comments string) STORED as ORC ;
 
- 
                Join the view of the stream over the past 15 minutes to
                        user_table, group by gender, and compute aggregates over
                    metrics from fact table and dimension tables.
                    SELECT SUM(added) AS added, SUM(deleted) AS deleted, AVG(delta) AS delta, AVG(age) AS avg_age , gender 
  FROM last_15_minutes_of_kafka_table  
  JOIN user_table ON `last_15_minutes_of_kafka_table`.`user` = `user_table`.`user`
  GROUP BY gender LIMIT 10;
 
 
- 
                Perform a classical user retention analysis over the Kafka stream consisting of
                    a stream-to-stream join that runs adhoc queries on a view defined over the past
                    15 minutes.
                
                    -- Stream join over the view itself
-- Assuming l15min_wiki is a view of the last 15 minutes
SELECT  COUNT( DISTINCT activity.`user`) AS active_users, 
COUNT(DISTINCT future_activity.`user`) AS retained_users
FROM l15min_wiki AS activity
LEFT JOIN l15min_wiki AS future_activity ON activity.`user` = future_activity.`user`
AND activity.`timestamp` = future_activity.`timestamp` - interval '5' minutes ; 
                    
--  Stream-to-stream join
-- Assuming wiki_kafka_hive is the entire stream.
SELECT floor_hour(activity.`timestamp`), COUNT( DISTINCT activity.`user`) AS active_users, 
COUNT(DISTINCT future_activity.`user`) as retained_users
FROM wiki_kafka_hive AS activity
LEFT JOIN wiki_kafka_hive AS future_activity ON activity.`user` = future_activity.`user`
AND activity.`timestamp` = future_activity.`timestamp` - interval '1' hour 
GROUP BY floor_hour(activity.`timestamp`);