Database
 sql >> Base de données >  >> RDS >> Database

Que sont les flux séquentiels et parallèles en Java ?

Java peut paralléliser les opérations de flux pour tirer parti des systèmes multicœurs. Cet article fournit une perspective et montre comment le flux parallèle peut améliorer les performances avec des exemples appropriés.

Flux en Java

Un flux en Java est une séquence d'objets représentés comme un conduit de données. Il a généralement une source où se trouvent les données et une destination où il est transmis. Notez qu'un flux n'est pas un référentiel ; à la place, il opère sur une source de données telle qu'un tableau ou une collection. Les bits intermédiaires dans le passage sont en fait appelés le flux. Au cours du processus de transmission, le flux passe généralement par une ou plusieurs transformations possibles, telles que le filtrage ou le tri, ou il peut s'agir de tout autre processus opérant sur les données. Cela personnalise les données d'origine sous une forme différente, généralement, selon les besoins du programmeur. Par conséquent, un nouveau flux est créé en fonction de l'opération qui lui est appliquée. Par exemple, lorsqu'un flux est trié, il en résulte un nouveau flux qui produit un résultat qui est ensuite trié. Cela signifie que les nouvelles données sont une copie transformée de l'original plutôt que d'être sous la forme originale.

Flux séquentiel

Toute opération de flux en Java, à moins qu'elle ne soit explicitement spécifiée comme parallèle, est traitée de manière séquentielle. Ce sont essentiellement des flux non parallèles qui utilisent un seul thread pour traiter leur pipeline. Les flux séquentiels ne tirent jamais parti du système multicœur même si le système sous-jacent peut prendre en charge l'exécution parallèle. Que se passe-t-il, par exemple, lorsque nous appliquons le multithreading pour traiter le flux ? Même dans ce cas, il fonctionne sur un seul cœur à la fois. Cependant, il peut sauter d'un noyau à un autre à moins qu'il ne soit explicitement épinglé à un noyau spécifique. Par exemple, le traitement dans quatre threads différents par rapport à quatre cœurs différents est évidemment différent là où le premier ne correspond pas au second. Il est tout à fait possible d'exécuter plusieurs threads dans un seul environnement de base, mais le traitement parallèle est un genre complètement différent. Un programme doit être conçu pour la programmation parallèle en plus de s'exécuter dans un environnement qui le prend en charge. C'est la raison pour laquelle la programmation parallèle est une arène complexe.

Essayons un exemple pour illustrer davantage l'idée.

package org.mano.example;

import java.util.Arrays;
import java.util.List;

public class Main2 {
   public static oid main(String[] args) {
      List<Integer> list=Arrays.asList(1,2,3,4,5,6,7,8,9);
      list.stream().forEach(System.out::println);
      System.out.println();
      list.parallelStream().forEach(System.out::println);
   }
}

Sortie

123456789
685973214

Cet exemple est une illustration de q flux séquentiel ainsi que q flux parallèle en fonctionnement. Le list.stream() fonctionne en séquence sur un seul thread avec le println() opération. list.parallelStream() , d'autre part, est traité en parallèle, tirant pleinement parti de l'environnement multicœur sous-jacent. L'aspect intéressant est dans la sortie du programme précédent. Dans le cas d'un flux séquentiel, le contenu de la liste est imprimé dans une séquence ordonnée. La sortie du flux parallèle, en revanche, n'est pas ordonnée et la séquence change à chaque exécution du programme. Cela signifie au moins une chose :cette invocation de list.parallelStream() la méthode rend le println instruction fonctionne dans plusieurs threads, quelque chose qui list.stream() fait dans un seul fil.

Flux parallèle

La principale motivation derrière l'utilisation d'un flux parallèle est de faire du traitement de flux une partie de la programmation parallèle, même si l'ensemble du programme peut ne pas être parallélisé. Le flux parallèle tire parti des processeurs multicœurs, ce qui entraîne une augmentation substantielle des performances. Contrairement à toute programmation parallèle, elles sont complexes et sujettes aux erreurs. Cependant, la bibliothèque de flux Java offre la possibilité de le faire facilement et de manière fiable. L'ensemble du programme peut ne pas être parallélisé. mais au moins la partie qui gère le flux peut être parallélisée. Ils sont en fait assez simples dans le sens où nous pouvons invoquer quelques méthodes et le reste est pris en charge. Il y a plusieurs façons de le faire. Une telle façon est d'obtenir un flux parallèle en appelant le parallelStream() méthode définie par Collection . Une autre façon est d'invoquer le parallel() méthode définie par BaseStream sur un flux séquentiel. Le flux séquentiel est parallélisé par l'invocation. Notez que la plate-forme sous-jacente doit prendre en charge la programmation parallèle, comme avec un système multicœur. Sinon, l'invocation ne sert à rien. Le flux serait traité en séquence dans un tel cas, même si nous avons fait l'invocation. Si l'invocation est faite sur un flux déjà parallèle, il ne fait rien et renvoie simplement le flux.

Pour garantir que le résultat du traitement parallèle appliqué au flux est le même que celui obtenu par le traitement séquentiel, les flux parallèles doivent être sans état, sans interférence et associatifs.

Un exemple rapide

package org.mano.example;

import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

public class Main {

   public static void main(String[] args) {
      List<Employee> employees = Arrays.asList(
         new Employee(1276, "FFF",2000.00),
         new Employee(7865, "AAA",1200.00),
         new Employee(4975, "DDD",3000.00),
         new Employee(4499, "CCC",1500.00),
         new Employee(9937, "GGG",2800.00),
         new Employee(5634, "HHH",1100.00),
         new Employee(9276, "BBB",3200.00),
         new Employee(6852, "EEE",3400.00));

      System.out.println("Original List");
      printList(employees);

      // Using sequential stream
      long start = System.currentTimeMillis();
      List<Employee> sortedItems = employees.stream()
         .sorted(Comparator
            .comparing(Employee::getName))
         .collect(Collectors.toList());
      long end = System.currentTimeMillis();

      System.out.println("sorted using sequential stream");
      printList(sortedItems);
      System.out.println("Total the time taken process :"
         + (end - start) + " milisec.");

      // Using parallel stream
      start = System.currentTimeMillis();
      List<Employee> anotherSortedItems = employees
         .parallelStream().sorted(Comparator
            .comparing(Employee::getName))
         .collect(Collectors.toList());
      end = System.currentTimeMillis();

      System.out.println("sorted using parallel stream");
      printList(anotherSortedItems);
      System.out.println("Total the time taken process :"
         + (end - start) + " milisec.");


      double totsal=employees.parallelStream()
         .map(e->e.getSalary())
         .reduce(0.00,(a1,a2)->a1+a2);
      System.out.println("Total Salary expense: "+totsal);
      Optional<Employee> maxSal=employees.parallelStream()
         .reduce((Employee e1, Employee e2)->
         e1.getSalary()<e2.getSalary()?e2:e1);
      if(maxSal.isPresent())
         System.out.println(maxSal.get().toString());
   }

   public static void printList(List<Employee> list) {
      for (Employee e : list)
         System.out.println(e.toString());
   }
}


package org.mano.example;

public class Employee {
   private int empid;
   private String name;
   private double salary;

   public Employee() {
      super();
   }

   public Employee(int empid, String name,
         double salary) {
      super();
      this.empid = empid;
      this.name = name;
      this.salary = salary;
   }

   public int getEmpid() {
      return empid;
   }

   public void setEmpid(int empid) {
      this.empid = empid;
   }

   public String getName() {
      return name;
   }

   public void setName(String name) {
      this.name = name;
   }

   public double getSalary() {
      return salary;
   }

   public void setSalary(double salary) {
      this.salary = salary;
   }

   @Override
   public String toString() {
      return "Employee [empid=" + empid + ", name="
         + name + ", salary=" + salary + "]";
   }
}

Dans le code précédent, notez comment nous avons appliqué le tri sur un flux en utilisant l'exécution séquentielle.

List<Employee> sortedItems = employees.stream()
               .sorted(Comparator
               .comparing(Employee::getName))
               .collect(Collectors.toList());

et l'exécution parallèle est obtenue en modifiant légèrement le code.

List<Employee> anotherSortedItems = employees
               .parallelStream().sorted(Comparator
               .comparing(Employee::getName))
               .collect(Collectors.toList());

Nous comparerons également le temps système pour avoir une idée de la partie du code qui prend le plus de temps. L'opération parallèle commence une fois que le flux parallèle est explicitement obtenu par le parallelStream() méthode. Il existe une autre méthode intéressante, appelée reduce() . Lorsque nous appliquons cette méthode à un flux parallèle, l'opération peut se produire dans différents threads.

Cependant, nous pouvons toujours basculer entre parallèle et séquentiel selon les besoins. Si nous voulons changer le flux parallèle en séquentiel, nous pouvons le faire en appelant le sequential() méthode spécifiée par BaseStream . Comme nous l'avons vu dans notre premier programme, l'opération effectuée sur le flux peut être ordonnée ou non selon l'ordre des éléments. Cela signifie que l'ordre dépend de la source de données. Ceci, cependant, n'est pas le cas dans le cas des flux parallèles. Pour augmenter les performances, ils sont traités en parallèle. Parce que cela se fait sans aucune séquence, où chaque partition du flux est traitée indépendamment des autres partitions sans aucune coordination, la conséquence est imprévisible et non ordonnée. Mais, si nous voulons spécifiquement effectuer une opération sur chaque élément du flux parallèle à ordonner, nous pouvons considérer le forEachOrdered() méthode, qui est une alternative à la méthode forEach() méthode.

Conclusion

Les API de flux font partie de Java depuis longtemps, mais l'ajout de la modification du traitement parallèle est très accueillant, et en même temps une fonctionnalité assez intrigante. Cela est particulièrement vrai parce que les machines modernes sont multicœurs et il y a une stigmatisation que la conception de la programmation parallèle est complexe. Les API fournies par Java offrent la possibilité d'incorporer une touche de réglages de programmation parallèle dans un programme Java qui a la conception globale d'une exécution séquentielle. C'est peut-être la meilleure partie de cette fonctionnalité.