Project

General

Profile

{{show_solution}}

HBase

Certains traitements ne nécessitent qu'un petit sous-ensemble des données stockées, et donc l'utilisation d'un index pour accéder plus rapidement à ce sous-ensemble. C'est ce qu'apporte HBase.

Pré-requis

Vérifier par le Cloudera Manager (pkill firefox ; firefox &) que les services Zookeeper et HBase sont lancés (sinon, les lancer).
Si le service HBase Master ne démarre pas, il faut éventuellement Créer le répertoire racine (voir les actions du service HBase).

Les concepts

Entrer dans le shell de HBase:

hbase shell

Effectuer les actions suivantes en vous aidant de la commande help 'ddl' et de la commande help 'dml':
  1. Créer une table 'test' avec une famille de colonnes 'data'. On utilisera cette table pour chaque étape de cet exercice.
  2. Vérifier que la table a bien été créée en listant les tables de la base.
  3. Modifier la famille de colonnes 'data' pour qu’elle affiche jusqu’à 4 versions des valeurs de ses cellules.
  4. Vérifier que la modification a bien été prise en compte en affichant la description de la table.
  5. Ajouter la valeur 'un A' dans la cellule '001A'-'data:info' (ligne '001A' et colonne 'data:info').
  6. Ajouter la valeur 'deux A' dans la cellule '002A'-'data:info'.
  7. Ajouter une valeur quelconque dans la cellule '002A'-'data:details', puis modifier la valeur de cette cellule 4 fois de suite.
  8. Lister le contenu de la table.
  9. Afficher la ligne '002A'.
  10. Afficher la valeur courante de la cellule '002A'-'data:details'.
  11. Afficher la valeur précédente de la cellule '002A'-'data:details'.
  12. Afficher toutes les valeurs précédentes de la cellule '002A'-'data:details'.
  13. Ajouter la valeur 'deux B' dans la cellule '002B'-'data:info'.
  14. Ajouter la valeur 'trois A' dans la cellule '003A'-'data:info'.
  15. Lister les lignes dont la clef commence par '002'.
  16. Supprimer la cellule '002A'-'data:info' et lister le contenu de la table.
  17. Supprimer la famille de colonne 'data' de la table (à l’aide de 'alter').
  18. Supprimer la table 'test'.
{{start_solution}}
  1. create 'test', 'data'
  2. list
  3. disable 'test'
    alter 'test', NAME => 'data', VERSIONS => 4
    enable 'test'
  4. describe 'test'
  5. put 'test', '001A', 'data:info', 'un A'
  6. put 'test', '002A', 'data:info', 'deux A'
  7. put 'test', '002A', 'data:details', 'valeur 1'
    put 'test', '002A', 'data:details', 'valeur 2'
    put 'test', '002A', 'data:details', 'valeur 3'
    put 'test', '002A', 'data:details', 'valeur 4'
    put 'test', '002A', 'data:details', 'valeur 5'
  8. scan 'test'
  9. get 'test', '002A'
  10. get 'test', '002A', 'data:details'
  11. get 'test', '002A', {COLUMN => 'data:details', TIMERANGE => [0, timestamp]}
  12. get 'test', '002A', {COLUMN => 'data:details', TIMERANGE => [0, timestamp], VERSIONS => 4}
  13. put 'test', '002B', 'data:info', 'deux B'
  14. put 'test', '003A', 'data:info', 'trois A'
  15. scan 'test', {FILTER => "PrefixFilter('002')"}
    ou bien
    scan 'test', STARTROW => '002', ENDROW => '003'
  16. delete 'test', '002A', 'data:info'
    scan 'test'
  17. disable 'test'
    alter 'test', 'delete' => 'data'
    enable 'test'
    scan 'test'
  18. disable 'test'
    drop 'test'
    list {{end_solution}}

Import des données

Import avec l'API HBase

L'import de données peut se faire via un programme.

Comprendre la classe hbase.HBaseTemperatureImporterV1 (inutile de l'exécuter). S'inspirer de cette classe pour importer la liste des stations météo dans une table 'stations':
  • utiliser la classe NcdcStationMetadataParser pour parser chaque ligne
  • entre le nom et l'identifiant de la station, choisir lequel doit être la clef afin d'optimiser l'exécution pour l'exercice suivant.
  • notez que vous importez les données depuis un fichier et non plus depuis un répertoire.
  • l'exécution nécessite d'ajouter la bibliothèque hbase.jar dans le classpath local:
    export HADOOP_CLASSPATH=/usr/lib/hbase/hbase.jar
    hadoop jar tp.jar hbase.HBaseStationImporter stations-fixed-width.txt
    

{{start_solution}}

public class HBaseStationImporter extends Configured implements Tool {
    private static final byte[] FAMILY = Bytes.toBytes("data");
    private static final byte[] STATION_ID = Bytes.toBytes("id");

    public int run(String[] args) throws IOException {
        if (args.length != 1) {
            System.err.println("Usage: HBaseStationImporter <input>");
            return -1;
        }

        HTable table = new HTable(HBaseConfiguration.create(getConf()), "stations");

        BufferedReader in = null;
        try {
            in = new BufferedReader(new InputStreamReader(new FileInputStream(args[0])));
            NcdcStationMetadataParser parser = new NcdcStationMetadataParser();
            String line;
            while ((line=in.readLine()) != null) {
                if (parser.parse(line)) {
                    Put put = new Put(Bytes.toBytes(parser.getStationName()));
                    put.add(FAMILY, STATION_ID, Bytes.toBytes(parser.getStationId()));
                    table.put(put);
                }
            }
        } finally {
            IOUtils.closeStream(in);
        }
        return 0;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(HBaseConfiguration.create(), new HBaseStationImporter(), args);
        System.exit(exitCode);
    }
}
{{end_solution}}

Import avec MapReduce

Si la quantité de données est conséquente, il vaut mieux utiliser un job MapReduce.

Comprendre et exécuter la classe hbase.HBaseTemperatureImporter afin d'importer les données météos dans une table 'records'.
  • la compilation des classes nécessite d'ajouter la bibliothèque /usr/lib/hbase/hbase.jar dans le ‘Java Build Path’ de votre projet Eclipse (Project -> Properties).
  • l'exécution nécessite d'ajouter cette bibliothèque dans les classpath locaux et distant:
    export HADOOP_CLASSPATH=/usr/lib/hbase/hbase.jar
    hadoop jar tp.jar hbase.HBaseTemperatureImporter -libjars $HADOOP_CLASSPATH hdfs:///user/cloudera/records2/sample.txt
    

Accès aux données

Accès avec l'API HBase

L'accès aux données se fait soit en Java via l'API de HBase, soit dans d'autres langages via Avro, REST ou Thrift.

L'exemple hbase.ListStations affiche les stations dont le nom commence par la chaîne de caractères passée en argument (par exemple 'PARIS').

Comprendre et exécuter cette classe, puis la modifier de façon a utiliser l'index de la table "stations". Bien que le jeu de données soit assez petit, vous observerez une différence de performance notable.

{{start_solution}}

public class ListStations2 extends Configured implements Tool {
    private static final byte[] FAMILY = Bytes.toBytes("data");
    private static final byte[] STATION_ID = Bytes.toBytes("id");

    public int run(String[] args) throws IOException {
        if (args.length != 1) {
            System.err.println("Usage: ListStations2 <city_name>");
            return -1;
        }
        String name = args[0];
        String end = name.substring(0,name.length()-1) + (char)(name.charAt(name.length()-1)+1);

        HTable table = new HTable(HBaseConfiguration.create(getConf()), "stations");
        Scan scan = new Scan();
        scan.addFamily(FAMILY);
        scan.setStartRow(Bytes.toBytes(name));
        scan.setStopRow(Bytes.toBytes(end));
        ResultScanner scanner = table.getScanner(scan);
        try {
            for (Result scannerResult : scanner) {
                byte[] stationName = scannerResult.getRow();
                //if (Bytes.toString(stationName).startsWith(name)) {
                    byte[] stationId = scannerResult.getValue(FAMILY, STATION_ID);
                    if (stationId != null) {
                        System.out.printf("%s\t%s\n", Bytes.toString(stationName), Bytes.toString(stationId));
                    }
                //}
            }
        } finally {
            scanner.close();
            table.close();
        }
        return 0;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(HBaseConfiguration.create(), new ListStations2(), args);
        System.exit(exitCode);
    }
}
{{end_solution}}

Partez de votre classe ainsi modifiée pour écrire un programme qui affiche la température la plus haute relevée par les stations météo parisiennes, sans jamais parcourir l'ensemble des données des tables HBase 'stations' et 'records'.

{{start_solution}}

public class TemperatureMaxOfStation extends Configured implements Tool {
    private static final byte[] FAMILY = Bytes.toBytes("data");
    private static final byte[] STATION_ID = Bytes.toBytes("id");
    private static final byte[] TEMPERATURE = Bytes.toBytes("temperature");

    public int run(String[] args) throws IOException {
        if (args.length != 1) {
            System.err.println("Usage: TemperatureMaxOfStation <city_name>");
            return -1;
        }
        String name = args[0];
        String end = name.substring(0,name.length()-1) + (char)(name.charAt(name.length()-1)+1);

        int max = Integer.MIN_VALUE;
        HTable table = new HTable(HBaseConfiguration.create(getConf()), "stations");
        HTable rTable = new HTable(HBaseConfiguration.create(getConf()), "records");
        Scan scan = new Scan();
        scan.addFamily(FAMILY);
        scan.setStartRow(Bytes.toBytes(name));
        scan.setStopRow(Bytes.toBytes(end));
        ResultScanner scanner = table.getScanner(scan);
        try {
            for (Result scannerResult : scanner) {
                byte[] stationName = scannerResult.getRow();
                //if (Bytes.toString(stationName).startsWith(name)) {
                    byte[] stationId = scannerResult.getValue(FAMILY, STATION_ID);
                    if (stationId != null) {
                        System.out.printf("%s\t%s\n", Bytes.toString(stationName), Bytes.toString(stationId));
                        byte[] stationIdEnd = new byte[stationId.length];
                        System.arraycopy(stationId, 0, stationIdEnd, 0, stationId.length);
                        stationIdEnd[stationIdEnd.length-1]++;

                        Scan rScan = new Scan();
                        rScan.addFamily(FAMILY);
                        rScan.setStartRow(stationId);
                        rScan.setStopRow(stationIdEnd);
                        ResultScanner rScanner = rTable.getScanner(rScan);
                        for (Result r : rScanner) {
                            byte[] temperature = r.getValue(FAMILY, TEMPERATURE);
                            if (temperature != null) {
                                int t = Bytes.toInt(temperature);
                                if (t > max) {
                                    max = t;
                                }
                            }
                        }
                    }
                //}
            }
        } finally {
            scanner.close();
            table.close();
            rTable.close();
        }
        System.out.println("max = "+max);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(HBaseConfiguration.create(), new TemperatureMaxOfStation(), args);
        System.exit(exitCode);
    }
}
{{end_solution}}

Accès avec MapReduce

Lorsque le traitement a effectuer nécessite de parcourir l'ensemble des données, l'utilisation de l'index devient alors contre-productive et il vaut mieux effectuer ce traitement avec un job MapReduce. HBase fourni une API permettant de parcourir les enregistrements d'une table HBase depuis un job MapReduce.

Comprendre et exécuter la classe hbase.RowCounter, puis s'en inspirer afin de calculer, pour chaque année, la moyenne des températures relevées.
  • Parmi les classes que vous avez écrites hier, lesquelles peuvent être réutilisées telles quelles?
  • Lesquelles doivent être modifiées pour lire les données dans la table HBase?

{{start_solution}}

public class StubMapper extends TableMapper<Text, IntWritable> {
    private static final byte[] FAMILY = Bytes.toBytes("data");
    private static final byte[] YEAR = Bytes.toBytes("year");
    private static final byte[] TEMPERATURE = Bytes.toBytes("temperature");

    @Override
    public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException, InterruptedException {
        byte[] year = values.getValue(FAMILY, YEAR);
        byte[] temperature = values.getValue(FAMILY, TEMPERATURE);
        if (year!=null && temperature!=null) {
            String y = Bytes.toString(year);
            int t = Bytes.toInt(temperature);
            context.write(new Text(y), new IntWritable(t));
        }
    }
}
{{end_solution}}

{{start_solution}}

public class StubDriver {
    public static void main(String[] args) throws Exception {
        if(args.length != 1) {
            System.err.println("Usage: StubDriver <output_directory>");
            System.exit(-1);
        }

        Job job = new Job(HBaseConfiguration.create(), "avg temp");
        job.setJarByClass(StubDriver.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileOutputFormat.setOutputPath(job, new Path(args[0]));
        TableMapReduceUtil.initTableMapperJob(
                "records",            // input table
                new Scan(),            // input scan instance
                StubMapper.class,        // mapper class
                Text.class,             // type of output key
                IntWritable.class,         // type of output value
                job);
        job.setReducerClass(StubReducer.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
{{end_solution}}