Project

General

Profile

{{show_solution(21)}}

MapReduce

MapReduce avec Hadoop Streaming

Les exercices suivant utilisent la base de données météo du NCDC. Cette base est constituée d'une ligne par enregistrement, dont on veut extraire l'année et la température mesurée.

0067011990999991950051507004...9999999N9+00001+99999999999...
0043011990999991950051512004...9999999N9+00221+99999999999...
0043011990999991950051518004...9999999N9-00111+99999999999...
0043012650999991949032412004...0500001N9+01111+99999999999...
0043012650999991949032418004...0500001N9+00781+99999999999...

Exercice 1

On veut extraire de la base de données météo du NCDC la température la plus haute, pour chaque année. Sur un petit jeu de données (par exemple les données météo des années 1901 et 1902), ceci peut se faire à l'aide d'un simple script shell:

#!/usr/bin/env bash
for year in all/*
do
    echo -ne `basename $year .gz`"\t" 
    gunzip -c $year | \
        awk '{ temp = substr($0, 88, 5) + 0;
            q = substr($0, 93, 1);
            if (temp !=9999 && q ~ /[01459]/ && temp > max) max = temp }
            END { print max }'
done

Ce script se trouve dans /home/cloudera/workspace/data
Comprendre et tester ce script.

Exercice 2

A partir d'une certaine quantité de données, il devient intéressant de distribuer ce calcul. Le design pattern MapReduce implémenté par Hadoop est un moyen simple et efficace d'y parvenir. Voici le même traitement, écrit en langage ruby, suivant le design pattern MapReduce:

Fichier max_temperature_map.rb:

#!/usr/bin/env ruby
STDIN.each_line do |line|
    val = line
    year, temp, q = val[15,4], val[87,5], val[92,1]
    puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/)
end

Fichier max_temperature_reduce.rb:

#!/usr/bin/env ruby
last_key, max_val = nil, 0
STDIN.each_line do |line|
    key, val = line.split("\t")
    if last_key && last_key != key
        puts "#{last_key}\t#{max_val}" 
        last_key, max_val = key, val.to_i
    else
        last_key, max_val = key, [max_val, val.to_i].max
    end
end
puts "#{last_key}\t#{max_val}" if last_key

Le pipeline MapReduce peut être simulé sur un petit jeu de données (données météo des années 1901 et 1902) en utilisant un simple pipeline Unix:

cat sample.txt | ./max_temperature_map.rb | sort | ./max_temperature_reduce.rb

Exécuter ce traitement. Les scripts se trouvent dans /home/cloudera/workspace/data.
Vérifier que l'on obtient les mêmes résultats que dans l'exercice 1.

Exercice 3

On peut faire exécuter ce job par hadoop. C'est que qu'on appelle Hadoop Streaming.

cd /home/cloudera/workspace
hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.2.0.jar \
    -input file:///home/cloudera/workspace/data/all/ -output output \
    -mapper max_temperature_map.rb -reducer max_temperature_reduce.rb \
    -file max_temperature_map.rb -file max_temperature_reduce.rb

Exécuter cette tâche. Notez que n'importe quel langage peut-être utilisé.

MapReduce en Java

Bien que hadoop puisse être utilisé avec n'importe quel langage grâce au streaming, certaines fonctionnalité ne sont accessibles que pour le langage Java.

Nous commencerons donc par refaire les étapes précédentes en langage Java avant d'aller plus loin.

NOTE: vous pouvez utiliser git pour conserver un historique de vos développements successifs (pas de push vers un serveur central).

Exercice 1

Le programme Java suivant fait la même chose que le script shell/awk vu dans le premier exercice MapReduce. Compiler et exécuter ce programme:

  • lancer Eclipse
  • dans le projet Training, créer un nouveau package maxtemp
  • y créer une classe TemperatureMax contenant le code ci-dessous
  • résoudre les problèmes d'imports manquants (placer la souris sur les mots soulignées en rouge)
  • prendre le temps de comprendre le code
  • créer un jar: clic droit sur "Training" -> Export -> Java -> JAR file -> Next -> JAR file: /home/cloudera/workspace/tp.jar -> Finish.
  • dans le terminal X:
    cd workspace/data
    java -cp ../tp.jar maxtemp.TemperatureMax
    
public class TemperatureMax {

    private static final int MISSING = 9999;

    public static void main(String[] args) throws IOException {
        // calcul des max
        Map<String,Integer> map = new HashMap<String,Integer>();
        for (File file : new File("all/").listFiles()) {
            BufferedReader reader = new BufferedReader(new InputStreamReader(
                    new java.util.zip.GZIPInputStream(new FileInputStream(file))));

            for (String line=reader.readLine(); line!=null; line=reader.readLine()) {
                String year = line.substring(15, 19);
                int airTemperature;
                if (line.charAt(87) == '+') { // parseInt ne reconnait pas le signe '+'
                    airTemperature = Integer.parseInt(line.substring(88, 92));
                } else {
                    airTemperature = Integer.parseInt(line.substring(87, 92));
                }
                String quality = line.substring(92, 93);
                if (airTemperature != MISSING && quality.matches("[01459]")) {
                    Integer oldValue = map.get(year);
                    if (oldValue != null)
                        map.put(year, Math.max(oldValue, airTemperature));
                    else    
                        map.put(year, airTemperature);
                }

            }
            reader.close();
        }

        // affichage
        for (String key : map.keySet()) {
            System.out.println(key+"\t"+map.get(key));
        }
    }
}

Exercice 2

Dans la classe TemperatureMax, les phases de map et de reduce sont confondues. Modifier cette classe de façon à dissocier ces 2 phases dans 2 boucles distinctes:
  • partie map: stocker toutes les températures valides dans une structure HashMap<String, List<Integer>>
  • partie reduce: parcourir toutes les valeurs stockées pour trouver et afficher la température maximum de chaque année.

Vous pouvez vous aider de la fonction suivante pour remplir la map:

private static void write(Map<String,List<Integer>> map, String key, Integer value) {
        if (!map.containsKey(key)) {
                map.put(key, new ArrayList<Integer>());
        }
        map.get(key).add(value);
}

{{start_solution}}

public class TemperatureMax2 {

    private static final int MISSING = 9999;

    public static void main(String[] args) throws IOException {
        // map
        Map<String,List<Integer>> map = new HashMap<String,List<Integer>>();
        for (File file : new File("all/").listFiles()) {
            BufferedReader reader = new BufferedReader(new InputStreamReader(
                    new GZIPInputStream(new FileInputStream(file))));
            for (String line=reader.readLine(); line!=null; line=reader.readLine()) {

                String year = line.substring(15, 19);
                int airTemperature;
                if (line.charAt(87) == '+') { // parseInt ne reconnait pas le signe '+'
                    airTemperature = Integer.parseInt(line.substring(88, 92));
                } else {
                    airTemperature = Integer.parseInt(line.substring(87, 92));
                }
                String quality = line.substring(92, 93);
                if (airTemperature != MISSING && quality.matches("[01459]")) {
                    write(map, year, airTemperature);
                }

            }
            reader.close();
        }

        // reduce
        for (String key : map.keySet()) {

            int maxValue = Integer.MIN_VALUE;
            for (Integer value : map.get(key)) {
                maxValue = Math.max(maxValue, value);
            }
            System.out.println(key+"\t"+maxValue);

        }
    }

    private static void write(Map<String,List<Integer>> map, String key, Integer value) {
        if (!map.containsKey(key)) {
            map.put(key, new ArrayList<Integer>());
        }
        map.get(key).add(value);
    }
}
{{end_solution}}

Exercice 3

Nous allons maintenant transformer notre code en véritable job MapReduce, parallélisable sur un cluster.
Un job est constitué de 3 classes: Driver (main), Mapper, et Reducer.
Vous retrouverez 3 squelettes pour ces classes dans le projet "training": StubDriver, StubMapper, StubReducer.

Hadoop manipule les données sous forme de <clef, valeur>, et utilise des types sérialisables (Writable) et optimisés pour l'échange de données sur le réseau. Utiliser les mécanismes de sérialisation du langage Java dans ce contexte présenterait en effet plusieurs inconvénients:
  • elle n'est pas assez compacte car trop générique,
  • elle ne peut pas être utilisée de façon efficace car on ne peut pas dé-sérialiser dans un pool d'objets,
  • son extensibilité n'est pas optimum,
  • l’interopérabilité avec d'autres langages (par exemple un map en java, un reduce en python) est difficile car son format de fichier est spécifique au langage Java.

Le mapper et le reducer communiquent par le context, qu'ils reçoivent en paramètre: on utilise context.write() pour passer les données au processus suivant.
L'API Java est de la forme:

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException {
        // ...
    }
}
public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException {
        // ...
    }
}

La classe StubMapper est de type generique, avec 4 paramètres qui spécifient les types de: KEYIN, VALUEIN, KEYOUT, VALUEOUT.

  • imaginez un cas d'utilisation possible de la clef du mapper.
  • quel est l'intérêt d'utiliser le type Text au lieu de IntWritable pour la clef du reducer (i.e. l'année des relevés de température)?
  • dans quel cas faudrait-il utiliser plutôt le type IntWritable?

Insérer le code suivant dans StubDriver.java (à la place du commentaire TODO: implement):

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    job.setMapperClass(StubMapper.class);
    job.setReducerClass(StubReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
Noter particulièrement:
  • la magie des fichiers d'entrée/sortie (FileInputFormat.addInputPath et FileOutputFormat.setOutputPath): on ne spécifie pas les fichiers, ni le fait qu'ils sont compressés
  • les types du résultat (job.setOutputKeyClass et job.setOutputValueClass) qui sont propres à notre cas

Insérer ensuite votre code de TemperatureMax2 dans les squelettes de classes.

ATTENTION: les types des données d'entré/sortie pré-remplis sont proposés en guise d'exemple et ne correspondent pas forcément tous à votre cas d'utilisation.
  • utiliser les types d'Hadoop (qui permettent à Hadoop d'optimiser le transfert des données sur le réseau) à la place des types standards de Java: par exemple utiliser Text au lieu de String, IntWritable au lieu de Integer.
  • pour les calculs, préférer les types Java aux types Hadoop
  • écrire dans l'objet context founi par Hadoop (context.write()) à la place de la sortie standard.

{{start_solution}}

public class StubMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        final int MISSING = 9999;
        String line = value.toString();
        String year = line.substring(15, 19);
        int airTemperature;
        if (line.charAt(87) == '+') { // parseInt ne reconnait pas le signe '+'
            airTemperature = Integer.parseInt(line.substring(88, 92));
        } else {
            airTemperature = Integer.parseInt(line.substring(87, 92));
        }
        String quality = line.substring(92, 93);
        if (airTemperature != MISSING && quality.matches("[01459]")) {
            context.write(new Text(year), new IntWritable(airTemperature));
        }
    }
}
{{end_solution}}

{{start_solution}}

public class StubReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {

        int maxValue = Integer.MIN_VALUE;
        for (IntWritable value : values) {
            maxValue = Math.max(maxValue, value.get());
        }
        context.write(key, new IntWritable(maxValue));

    }
}
{{end_solution}}

{{start_solution}}

public class StubDriver {

  public static void main(String[] args) throws Exception {

    /*
     * Validate that two arguments were passed from the command line.
     */
    if (args.length != 2) {
      System.out.printf("Usage: StubDriver <input dir> <output dir>\n");
      System.exit(-1);
    }

    /*
     * Instantiate a Job object for your job's configuration. 
     */
    Job job = new Job();

    /*
     * Specify the jar file that contains your driver, mapper, and reducer.
     * Hadoop will transfer this jar file to nodes in your cluster running 
     * mapper and reducer tasks.
     */
    job.setJarByClass(StubDriver.class);

    /*
     * Specify an easily-decipherable name for the job.
     * This job name will appear in reports and logs.
     */
    job.setJobName("Max temperature");

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.setMapperClass(StubMapper.class);
    job.setReducerClass(StubReducer.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    /*
     * Start the MapReduce job and wait for it to finish.
     * If it finishes successfully, return 0. If not, return 1.
     */
    boolean success = job.waitForCompletion(true);
    System.exit(success ? 0 : 1);
  }
}
{{end_solution}}

Soumettez votre job sur Hadoop:
  • d'abord avec de petits fichiers locaux:
    export HADOOP_CLASSPATH=tp.jar
    hadoop StubDriver file:///home/cloudera/workspace/data/all output
    
  • puis avec les fichiers déployés dans HDFS:
    hadoop StubDriver all output2
    
    {{note(HADOOP CLUSTER GROUP 1)}}

Exercice 4

La plupart des traitements MapReduce étant limités par la bande passante disponible sur le cluster, il est souvent bénéfique de minimiser la quantité de données transférées entre les taches map et les taches reduce. Hadoop permet à l'utilisateur de spécifier une fonction Combiner à exécuter sur le résultat de la fonction map. Le résultat de la fonction Combiner devient alors l'entrée de la fonction reduce.

La fonction Combiner implémente l'interface Reducer, tout comme la fonction reduce dont elle peut dans certains cas réutiliser le code tel quel.

Ajouter la ligne suivante dans votre StubDriver et réexécuter votre code:

job.setCombinerClass(StubReducer.class);

Noter que l'utilisation de StubReducer comme Combiner est rendu possible par le fait que notre fonction Reduce accepte en entrée les mêmes types qu'en sortie.

Exercice 5

Modifier votre fonction reduce de façon à calculer, pour chaque année, la moyenne des températures au lieu de la température maximum.
  • modifier uniquement votre fonction reduce et garder votre fonction map inchangée
  • la méthode reduce() est appelée une et une seule fois pour chaque valeur de clef. En quoi cette propriété du MapReduce est-elle importante pour résoudre notre problème?
  • pourquoi faut-il supprimer votre fonction "Combiner" de la classe StubDriver pour ce traitement ?

{{start_solution}}

public class StubReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {

        int sum=0, count=0;
        for (IntWritable value : values) {
            sum += value.get();
            count++;
        }
        context.write(key, new IntWritable(sum/count));

    }
}
{{end_solution}}

Soumettez votre job sur Hadoop. Vous devez obtenir:

1901    46
1902    21

{{note(HADOOP CLUSTER GROUP 2)}}

Exercice 6

Exercice 6.1

Nous allons maintenant utiliser une fonction Combiner afin réduire la quantité de données échangée entre les nœuds du cluster.

La fonction Combiner étant une optimisation, elle peut être appelée zero, une ou plusieurs fois selon les cas rencontrés par Hadoop lors de l'exécution des traitement. Il est donc important que la fonction Reduce puisse traiter indifféremment le résultat de la fonction map ou d'un ou plusieurs appels à la fonction Combiner. Ce qui implique notamment que la classe Combiner doit avoir les même types en entrée et en sortie.

  • pourquoi la fonction reduce ne peut-elle pas être ré-utilisée telle quelle?
  • imaginer un processus selon lequel le Reducer pourrait être appelé/chaîné zéro ou plusieurs fois, avec un résultat identique. En particulier, spécifier les données qui doivent être échangées entre le Mapper, les Combiners et le Reducer. Noter que Hadoop permet d'utiliser des structures de données complexes.

Exprimer la spécification précédente en écrivant les signatures des classes correspondantes, et en vous aidant de la classe TemperatureAveragePair pré-installée dans votre projet Eclipse:

public class StubMapper extends Mapper<..., ..., ..., ...> {}
public class StubCombiner extends Reducer<..., ..., ..., ...> {}
public class StubReducer extends Reducer<..., ..., ..., ...> {}

{{start_solution}}

public class StubMapper extends Mapper<LongWritable, Text, Text, TemperatureAveragingPair> {}
public class StubCombiner extends Reducer<Text, TemperatureAveragingPair, Text, TemperatureAveragingPair> {}
public class StubReducer extends Reducer<Text, TemperatureAveragingPair, Text, IntWritable> {}
{{end_solution}}

Exercice 6.2

Dans un premier temps, modifier et tester le programme sans utiliser la fonction "Combiner" (commenter job.setCombinerClass)

ATTENTION: lorsqu'ils ne sont pas spécifiés, Hadoop suppose que les types du résultat de la fonction map (job.setMapOutputKeyClass et job.setMapOutputValueClass) sont les mêmes que ceux de la fonction reduce.

{{start_solution}}

public class StubMapper extends Mapper<LongWritable, Text, Text, TemperatureAveragingPair> {

    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        final int MISSING = 9999;
        String line = value.toString();
        String year = line.substring(15, 19);
        int airTemperature;
        if (line.charAt(87) == '+') { // parseInt ne reconnait pas le signe '+'
            airTemperature = Integer.parseInt(line.substring(88, 92));
        } else {
            airTemperature = Integer.parseInt(line.substring(87, 92));
        }
        String quality = line.substring(92, 93);
        if (airTemperature != MISSING && quality.matches("[01459]")) {
            context.write(new Text(year), new TemperatureAveragingPair(airTemperature, 1));
        }
    }
}
{{end_solution}}

{{start_solution}}

public class StubReducer extends Reducer<Text, TemperatureAveragingPair, Text, IntWritable> {

    @Override
    public void reduce(Text key, Iterable<TemperatureAveragingPair> values, Context context)
            throws IOException, InterruptedException {

        int sum=0, count=0;
        for (TemperatureAveragingPair pair : values) {
            sum += pair.getTemp().get();
            count += pair.getCount().get();
        }
        context.write(key, new IntWritable(sum/count));

    }
}
{{end_solution}}

{{start_solution}}

    // à ajouter dans la classe StubDriver
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(TemperatureAveragingPair.class);
{{end_solution}}

Exercice 6.3

Implémenter et faire appel à la fonction "Combiner" (dé-commenter job.setCombinerClass)

Soumettez votre job sur Hadoop.

{{start_solution}}

public class StubCombiner extends Reducer<Text, TemperatureAveragingPair, Text, TemperatureAveragingPair> {

    @Override
    public void reduce(Text key, Iterable<TemperatureAveragingPair> values, Context context)
            throws IOException, InterruptedException {

        int sum=0, count=0;
        for (TemperatureAveragingPair pair : values) {
            sum += pair.getTemp().get();
            count += pair.getCount().get();
        }
        context.write(key, new TemperatureAveragingPair(sum, count));

    }
}
{{end_solution}}

{{note(HADOOP CLUSTER GROUP 3)}}

Exercice 7

Exercice 7.1

Pour chaque tranche de 5°C, donner le nombre de stations ayant une température moyenne (toutes années confondues) comprise dans cette tranche:
  • le code de la station est récupérable avec line.substring(4, 10)
  • décomposer un problème complexe en plusieurs jobs MapReduce permet d'améliorer la compréhension, la testabilité et la réutilisabilité de votre code.
Plusieurs méthodes existent pour chaîner des jobs MapReduce:
  • pour un chaînage linéaire, la méthode la plus simple consiste à déclarer plusieurs jobs dans le Driver, et les faire communiquer au moyen de fichiers, par exemple en utilisant le format SequenceFile de Hadoop:
        // ecriture
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        SequenceFileOutputFormat.setCompressOutput(job, true);
        SequenceFileOutputFormat.setOutputPath(job, new Path("output-temp"));
        // lecture
        job.setInputFormatClass(SequenceFileInputFormat.class);
        SequenceFileInputFormat.setInputPaths(job, new Path("output-temp"));
    

C'est la méthode que nous utiliserons pour cet exercice.

{{start_solution}}

public class StubMapper extends Mapper<LongWritable, Text, Text, TemperatureAveragingPair> {

    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        final int MISSING = 9999;
        String line = value.toString();
        String station = line.substring(4, 10);
        int airTemperature;
        if (line.charAt(87) == '+') { // parseInt ne reconnait pas le signe '+'
            airTemperature = Integer.parseInt(line.substring(88, 92));
        } else {
            airTemperature = Integer.parseInt(line.substring(87, 92));
        }
        String quality = line.substring(92, 93);
        if (airTemperature != MISSING && quality.matches("[01459]")) {
            context.write(new Text(station), new TemperatureAveragingPair(airTemperature, 1));
        }
    }
}
{{end_solution}}

{{start_solution}}

public class StubMapper2 extends Mapper<Text, IntWritable, IntWritable, Text> {

    @Override
    public void map(Text station, IntWritable avgTemp, Context context)
            throws IOException, InterruptedException {

        context.write(new IntWritable(avgTemp.get()/50), new Text(station.toString()));
    }
}
{{end_solution}}

{{start_solution}}

public class StubReducer2 extends Reducer<IntWritable, Text, IntWritable, IntWritable> {

    @Override
    public void reduce(IntWritable avgTemp, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {

        int count=0;
        for(Text station : values) count ++;
        context.write(avgTemp, new IntWritable(count));

    }
}
{{end_solution}}

{{start_solution}}

public class StubDriver {

  public static void main(String[] args) throws Exception {

    /*
     * Validate that two arguments were passed from the command line.
     */
    if (args.length != 2) {
      System.out.printf("Usage: StubDriver <input dir> <output dir>\n");
      System.exit(-1);
    }

    {
        Job job = new Job();
        job.setJarByClass(StubDriver.class);
        job.setJobName("Average temperature");

        FileInputFormat.addInputPath(job, new Path(args[0]));
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        SequenceFileOutputFormat.setCompressOutput(job, true);
        SequenceFileOutputFormat.setOutputPath(job, new Path("file:///tmp/output-temp"));

        job.setMapperClass(StubMapper.class);
        job.setCombinerClass(StubCombiner.class);
        job.setReducerClass(StubReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(TemperatureAveragingPair.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        boolean success = job.waitForCompletion(true);
        if(!success) System.exit(1);
    }
    {
        Job job = new Job();
        job.setJarByClass(StubDriver.class);
        job.setJobName("job2");

        job.setInputFormatClass(SequenceFileInputFormat.class);
        SequenceFileInputFormat.setInputPaths(job, new Path("file:///tmp/output-temp"));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(StubMapper2.class);
        job.setReducerClass(StubReducer2.class);

        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);

        boolean success = job.waitForCompletion(true);
        if(!success) System.exit(1);
    }
    System.exit(0);
  }
}
{{end_solution}}

Pour, les moyennes par station sur les années 1901-1902, vous devez trouver:

029070  17
029500  36
029600  34
029720  47
029810  53
227070  15

Pour visualiser un SequenceFile:

hadoop fs -text data.seq

{{note(HADOOP CLUSTER GROUP 4)}}

Exercice 7.2

D'autres outils compatibles avec Hadoop permettent d'effectuer des enchaînements plus complexes de jobs MapReduce:
  • le "JobControl" qui représente un graphe de jobs à orchestrer.
  • l'outil "Apache Oozie" qui permet la gestion avancée de workflows de jobs (MapReduce, Pig, Hive, ...). Contrairement à JobControl, celui-ci soumet les jobs depuis le client. Oozie est un service tournant sur le cluster. Il permet par exemple la détection de failed jobs, le scheduling des tâches, etc...
  • À noter aussi que pour simplement chainer plusieurs Mappers ou plusieurs Reducers, on peut utiliser ChainMapper et ChainReducer

Exercice 7.3

Refaire l'exercice 7.1 en utilisant Apache Oozie.

{{start_solution}}

Il faut tout d'abord démarrer oozie depuis le Cloudera Manager.
Pour lancer un job, il faut créer un répertoire sur hdfs avec les éléments suivants :
- racine
-- workflow.xml
-- lib
---- fichier jar

Le fichier jar est le même que précédemment.
Un exemple de fichier workflow.xml est donné ci-dessous.

Il est également nécessaire de posséder, en local, un fichier workflow.properties, également donné ci-dessous.

Enfin, oozie s'exécute à l'aide de la commande suivante :
oozie job -oozie http://localhost:11000/oozie -config workflow.properties -run

Workflow.xml:
<workflow-app xmlns="uri:oozie:workflow:0.1" name="monWorkflow">
  <start to="job1"/>
  <!-- premier job map/reduce -->
  <action name="job1">
    <map-reduce>
      <job-tracker>${jobTracker}</job-tracker>
      <name-node>${nameNode}</name-node>
      <!-- On nettoie output/ au démarrage -->
      <prepare>
        <delete path="${nameNode}/user/${wf:user()}/outputtemp"/>
        <delete path="${nameNode}/user/${wf:user()}/output"/>
      </prepare>
      <configuration>
        <property>
          <name>mapred.reducer.new-api</name>
          <value>true</value>
        </property>
        <property>
          <name>mapred.mapper.new-api</name>
          <value>true</value>
        </property>
        <property>
          <name>mapreduce.map.class</name>
          <value>Stub7Mapper</value>
        </property>
        <property>
          <name>mapreduce.reduce.class</name>
          <value>Stub7Reducer</value>
        </property>
        <property>
          <name>mapred.output.key.class</name>
          <value>org.apache.hadoop.io.Text</value>
        </property>
        <property>
          <name>mapred.output.value.class</name>
          <value>org.apache.hadoop.io.DoubleWritable</value>
        </property>
        <property>
          <name>mapred.mapoutput.key.class</name>
          <value>org.apache.hadoop.io.Text</value>
        </property>
        <property>
          <name>mapred.mapoutput.value.class</name>
          <value>avg_opt.TemperatureAveragingPair</value>
        </property>
        <property>
          <name>mapred.input.dir</name>
          <value>/user/${wf:user()}/workspace2/all</value>
        </property>
        <property>
          <name>mapred.output.dir</name>
          <value>/user/${wf:user()}/outputtemp</value>
        </property>
      </configuration>
    </map-reduce>
    <ok to="job2"/>
    <error to="fail"/>
  </action>

  <!-- deuxième job map/reduce -->
  <action name="job2">
    <map-reduce>
      <job-tracker>${jobTracker}</job-tracker>
      <name-node>${nameNode}</name-node>
      <!--
      <prepare>
        <delete path="${nameNode}/user/${wf:user()}/output"/>
      </prepare>
      -->
      <configuration>
        <property>
          <name>mapred.reducer.new-api</name>
          <value>true</value>
        </property>
        <property>
          <name>mapred.mapper.new-api</name>
          <value>true</value>
        </property>
        <property>
          <name>mapreduce.map.class</name>
          <value>Stub7bMapper</value>
        </property>
        <property>
          <name>mapreduce.reduce.class</name>
          <value>Stub7bReducer</value>
        </property>
        <property>
          <name>mapred.output.key.class</name>
          <value>org.apache.hadoop.io.Text</value>
        </property>
        <property>
          <name>mapred.output.value.class</name>
          <value>org.apache.hadoop.io.IntWritable</value>
        </property>
        <property>
          <name>mapred.mapoutput.key.class</name>
          <value>org.apache.hadoop.io.Text</value>
        </property>
        <property>
          <name>mapred.mapoutput.value.class</name>
          <value>org.apache.hadoop.io.Text</value>
        </property>
        <property>
          <name>mapred.input.dir</name>
          <value>/user/${wf:user()}/outputtemp</value>
        </property>
        <property>
          <name>mapred.output.dir</name>
          <value>/user/${wf:user()}/output</value>
        </property>
      </configuration>
    </map-reduce>
    <ok to="end"/>
    <error to="fail"/>
  </action>

  <kill name="fail">
    <message>MapReduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
    </message>
  </kill>
  <end name="end"/>
</workflow-app>

workflow.properties:

nameNode=hdfs://localhost.localdomain:8020
jobTracker=localhost.localdomain:8021
oozie.wf.application.path=${nameNode}/user/${user.name}/oozie-workflow
{{end_solution}}

Exercice 8

Caculer et afficher, pour chaque station, la moyenne de la température par année, afin de déterminer l'évolution.

Exercice 9

Modifier l'un des jobs MapReduce créés lors des exercices précédents, afin de lui faire utiliser la classe ToolRunner qui facilite la configuration de votre job MapReduce.

public class MaxTemperatureDriver extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        // ...
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new MaxTemperatureDriver(), args);
        System.exit(exitCode);
    }
}