Mysql
 sql >> Base de données >  >> RDS >> Mysql

Stockage de la sortie de données Apache Hadoop dans la base de données Mysql

Le bon exemple est montré sur ce blog , j'ai essayé et ça se passe très bien. Je cite les parties les plus importantes du code.

Dans un premier temps, vous devez créer une classe représentant les données que vous souhaitez stocker. La classe doit implémenter l'interface DBWritable :

public class DBOutputWritable implements Writable, DBWritable
{
   private String name;
   private int count;

   public DBOutputWritable(String name, int count) {
     this.name = name;
     this.count = count;
   }

   public void readFields(DataInput in) throws IOException {   }

   public void readFields(ResultSet rs) throws SQLException {
     name = rs.getString(1);
     count = rs.getInt(2);
   }

   public void write(DataOutput out) throws IOException {    }

   public void write(PreparedStatement ps) throws SQLException {
     ps.setString(1, name);
     ps.setInt(2, count);
   }
}

Créez des objets de classe précédemment définie dans votre Reducer :

public class Reduce extends Reducer<Text, IntWritable, DBOutputWritable, NullWritable> {

   protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) {
     int sum = 0;

     for(IntWritable value : values) {
       sum += value.get();
     }

     try {
       ctx.write(new DBOutputWritable(key.toString(), sum), NullWritable.get());
     } catch(IOException e) {
       e.printStackTrace();
     } catch(InterruptedException e) {
       e.printStackTrace();
     }
   }
}

Enfin, vous devez configurer une connexion à votre base de données (n'oubliez pas d'ajouter votre connecteur db sur le classpath) et enregistrer les types de données d'entrée/sortie de votre mappeur et de votre réducteur.

public class Main
{
   public static void main(String[] args) throws Exception
   {
     Configuration conf = new Configuration();
     DBConfiguration.configureDB(conf,
     "com.mysql.jdbc.Driver",   // driver class
     "jdbc:mysql://localhost:3306/testDb", // db url
     "user",    // username
     "password"); //password

     Job job = new Job(conf);
     job.setJarByClass(Main.class);
     job.setMapperClass(Map.class); // your mapper - not shown in this example
     job.setReducerClass(Reduce.class);
     job.setMapOutputKeyClass(Text.class); // your mapper - not shown in this example
     job.setMapOutputValueClass(IntWritable.class); // your mapper - not shown in this example
     job.setOutputKeyClass(DBOutputWritable.class); // reducer's KEYOUT
     job.setOutputValueClass(NullWritable.class);   // reducer's VALUEOUT
     job.setInputFormatClass(...);
     job.setOutputFormatClass(DBOutputFormat.class);

     DBInputFormat.setInput(...);

     DBOutputFormat.setOutput(
     job,
     "output",    // output table name
     new String[] { "name", "count" }   //table columns
     );

     System.exit(job.waitForCompletion(true) ? 0 : 1);
   }
}