Project

General

Profile

{{show_solution}}

Hive

Certains traitements nécessitent l'écriture de ses propres jobs MapReduce car ils sont trop particuliers ou trop complexes pour être exprimés dans un langage de plus haut niveau, ou bien lorsque l'interpréteur de ce langage ne permet pas de le traiter aussi efficacement. Cependant, de nombreux problèmes peuvent être exprimés dans un langage de plus haut niveau tel que le SQL.

Hive traduit des requêtes écrites en HiveQL (un dialecte de SQL influencé par MySQL) en un workflow de jobs MapReduce, puis soumet ces jobs sur le cluster Hadoop. Il ne s'agit donc pas d'une base de données, mais d'une couche d'abstraction au dessus du framework MapReduce.

Création de table

Rentrer dans le shell de Hive :

hive

Creer une table :

CREATE TABLE records (year STRING, temperature INT, quality INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';

Où sont stockées les données de cette table?

Le fichier sample-with-tab.txt contient un court extrait de données météorologique, dont les champs sont délimités par une tabulation. Remplir la table "records" avec le contenu de ce fichier.

LOAD DATA LOCAL INPATH 'workspace/data/sample-with-tab.txt'
OVERWRITE INTO TABLE records;

Vérifier que tout a été créé comme attendu :

SHOW TABLES;
DESCRIBE records;
SELECT * FROM records;

Vérifier que les champs de la table sont correctement mappés en faisant exécuter à Hive un job MapReduce simple :

SELECT DISTINCT year FROM records;

D'autres formats de fichier peuvent être utilisés par Hive grâce aux serialiseurs/déserialiseurs (SERDE). Utiliser le SERDE basé sur les expressions régulières pour importer dans Hive le fichier 'stations-fixed-width.txt' contenant des champs de taille fixe :

CREATE TABLE stations (usaf STRING, wban STRING, name STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES ( "input.regex" = "(\\d{6}) (\\d{5}) (.{29}) .*" );
LOAD DATA LOCAL INPATH 'workspace/data/stations-fixed-width.txt'
OVERWRITE INTO TABLE stations;

Vérifier que les champs de la table sont correctement mappés en faisant exécuter à Hive un job MapReduce simple :

SELECT * FROM stations WHERE name LIKE 'PARIS%';

Comme vous pouvez le constater en exécutant la commande hadoop fs -cat /user/hive/warehouse/records/sample-with-tab.txt (à l'extérieur du shell de Hive), hive copie les fichiers contenant les enregistrements de la table sous un répertoire situé dans son espace HDFS (/user/hive/warehouse). Ce répertoire porte le nom de la table. Ce mode de fonctionnement permet à Hive de gérer la table indépendamment du fichier de données d'origine mais il nécessite de dupliquer ce fichier d'origine.

DROP TABLE records;

Création de table externe

table externe avec tabulations

Si les données sont également utilisées en dehors de Hive et qu'on souhaite éviter de les dupliquer, il faut alors utiliser le mode "externe":

CREATE EXTERNAL TABLE records (year STRING, temperature INT, quality INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION '/user/cloudera/records/'

Charger les données tabulées:

LOAD DATA LOCAL INPATH 'workspace/data/sample-with-tab.txt'
OVERWRITE INTO TABLE records;

Vérifier le type et l'emplacement de la nouvelle table 'records':

DESCRIBE FORMATTED records;

Notez que dans ce mode d'utilisation, la commande DROP TABLE supprimera la table dans Hive, mais pas le fichier de données. Vous pouvez le vérifier en supprimant puis recréant la table 'records'.

table externe avec champs de taille fixe

Importer (sans les copier!) les fichiers météo utilisés dans le TP Hadoop dans une table records2(station STRING, year STRING, temperature INT, quality INT) :
  • la décompression gzip est supportée de façon implicite.
  • dans vos règles de mapping, pensez à supprimer le signe '+' (non reconnu) tout en conservant le signe '-'.

Vérifier que les champs de la table sont correctement mappés en faisant exécuter à Hive un job MapReduce simple.

SELECT * FROM records2 LIMIT 4;

{{start_solution}}

CREATE EXTERNAL TABLE records2 (station STRING, year STRING, temperature INT, quality INT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES ("input.regex"=".{4}(.{6}).{5}(.{4}).{68}\\+?(-?\\d{4})(\\d).*")
LOCATION '/user/cloudera/records2'
{{end_solution}}

Intégration de script

Hive permet d'invoquer un script externe lorsque le traitement à effectuer n'est pas possible avec le langage HiveQL (ce qui n'est pas le cas de l'exemple ci-dessous):

#!/usr/bin/env python
import re
import sys
for line in sys.stdin:
    (year, temp, q) = line.strip().split()
    if (temp != "9999" and re.match("[01459]", q)):
        print "%s\t%s" % (year, temp)

ADD FILE /home/cloudera/workspace/is_good_quality.py;
FROM records
SELECT TRANSFORM(year, temperature, quality)
USING 'is_good_quality.py'
AS year, temperature;

Comprendre et exécuter cet exemple.

Réécrire cet exemple en pur SQL, sans utiliser de script python.

{{start_solution}}

SELECT year, temperature
FROM records2
WHERE temperature != 9999
  AND (quality=0 OR quality=1 OR quality=4 OR quality=5 OR quality=9);
{{end_solution}}

Notez qu'il est possible d'écrire des fonctions utilisateur (User Defined Funtions/UDF) sous forme de plugins Java: https://cwiki.apache.org/confluence/display/Hive/HivePlugins

Agrégats

Récrire en HiveQL les premiers jobs MapReduce d'hier en utilisant les fonctions d'agrégation de HiveQL:

  • calcul de la température maximum par année.
    • en pur HiveQL {{start_solution}}
      SELECT year, MAX(temperature)
      FROM records2
      WHERE temperature != 9999
        AND (quality=0 OR quality=1 OR quality=4 OR quality=5 OR quality=9)
      GROUP BY year;
      
      {{end_solution}}
    • en faisant appel au script is_good_quality.py.
      ATTENTION: la sortie produite par le script est un ensemble de String qu'il faut éventuellement caster avec cast(temperature as int) {{start_solution}}
      SELECT rec.year, max(cast(rec.temperature as int))
      FROM (
        FROM records2
        SELECT TRANSFORM(year, temperature, quality)
        USING 'is_good_quality.py'
        AS year, temperature
      ) rec
      GROUP BY rec.year;
      
      {{end_solution}}
  • calcul de la température moyenne par année. {{start_solution}}
    SELECT year, AVG(temperature)
    FROM records2
    WHERE temperature != 9999
      AND (quality=0 OR quality=1 OR quality=4 OR quality=5 OR quality=9)
    GROUP BY year;
    
    {{end_solution}}

Requêtes imbriquées

HiveQL supporte les requêtes imbriquées. Tirez profit de cette possibilité pour écrire les requêtes suivantes:

  • calculer la moyenne de l'ensemble des températures maximum de chaque couple année/station météo: {{start_solution}}
    SELECT AVG(max_temperature)
    FROM (
        SELECT station, year, MAX(temperature) AS max_temperature
        FROM records2
        WHERE temperature != 9999
          AND (quality=0 OR quality=1 OR quality=4 OR quality=5 OR quality=9)
        GROUP BY station, year
    ) mt;
    
  • pour chaque tranche de 5°C, calcul du nombre de stations ayant une température moyenne comprise dans cette tranche (voir dernier exercice des TP de la veille). {{start_solution}}
    SELECT FLOOR(avg_temperature/50), COUNT(station)
    FROM (
        SELECT station, AVG(temperature) AS avg_temperature
        FROM records2
        WHERE temperature != 9999
          AND (quality=0 OR quality=1 OR quality=4 OR quality=5 OR quality=9)
        GROUP BY station
    ) mt
    GROUP BY FLOOR(avg_temperature/50);
    
    {{end_solution}}

Jointures

HiveQL supporte les jointures, mais à condition de les expliciter (la clause FROM n'accepte qu'un seul nom de table). L'exemple suivant donne la liste des stations météo enregistrées dans la base en 1901 et 1902:

SELECT DISTINCT name
FROM stations
JOIN records2 ON (records2.station = stations.usaf)
WHERE year = '1901' OR year = '1902';

Ecrire en HiveQL les requêtes suivantes:
  • calcul de la température moyenne de l'ensemble des stations météo UTO et TURKU. {{start_solution}}
    SELECT AVG(temperature)
    FROM stations
    JOIN records2 ON (records2.station = stations.usaf)
    WHERE name LIKE 'UTO%' OR name LIKE 'TURKU%';
    
    {{end_solution}}
  • calcul de la température moyenne de la station météo UTO, et de celle de TURKU. {{start_solution}}
    SELECT name, AVG(temperature)
    FROM stations
    JOIN records2 ON (records2.station = stations.usaf)
    WHERE name LIKE 'UTO%' OR name LIKE 'TURKU%'
    GROUP BY name;
    
    {{end_solution}}

ATTENTION: les noms des stations peuvent comporter des espaces aux extrémités: utiliser TRIM()

Tri

HiveQL supporte le tri du résultat de la requête avec la clause standard ORDER BY, mais celle-ci réduit considérablement la performance du traitement puisqu'elle nécessite de regrouper tous les résultats intermédiaires en une seule tâche "reduce" afin de les trier entre eux. Une prochaine version de Hive améliorera ce point en supportant un tri parallèle efficace.

Lorsque le tri n'a pas besoin d'être global sur l'ensemble du résultat de la requête, vous pouvez utiliser l'extension non-standard de Hive SORT BY à la place de ORDER BY, afin de trier chacune des parties de résultat produites par les différentes tâches "reduce". La clause DISTRIBUTE BY de Hive permet de contrôler la façon dont les données intermédiaires produites par les tâches "map" sont réparties dans les tâches "reduce".

La requête suivante tri les données météo par température décroissante au sein de chaque année, et les années croissantes:

SELECT year, temperature
FROM records2
DISTRIBUTE BY year
SORT BY year ASC, temperature DESC;

Exécuter cet exemple.