John Jiang

a cup of Java, cheers!
https://github.com/johnshajiang/blog

   :: 首页 ::  :: 联系 :: 聚合  :: 管理 ::
  131 随笔 :: 1 文章 :: 530 评论 :: 0 Trackbacks
利用Java SE 8流处理数据
-- 使用Java流操作去表达复杂的数据查询

本文是Java Magazine 201403/04刊中的一篇文章,也是文章系列"利用Java SE 8流处理数据"中的第一篇,它概述了Java流的基本原理与基本应用,是一篇很好的Java Streams API的入门文章。(2014.07.27最后更新)

    没有集合对象,你会怎么样?几乎每个Java应用都会创建并处理集合。它们是许多编程任务的基础:集合使你能够对数据进行分组和处理。例如,你也许会创建一个关于银行交易的集合,该集合代表了某个用户的银行流水清单。然后,你可能想要处理整个集合去算出该用户花了多少钱。尽管数据处理十分重要,但Java在此方面表现的远不完美。
    首先,典型的集合处理模式与类SQL操作相似,诸如"查找"(如找出最大金额的那笔交易)或者"分组"(如,将与购买杂货相关的交易进行分组)。大部分数据库允许你以声明的形式去指定这些操作。例如,后面的SQL查询会让你找到那笔最大金额交易的ID:"SELECT id, MAX(value) from transctions"。
    如你所见,我们并不需要去实现如何计算最大值(例如,使用循环和一个变量去追踪这个最大值)。我们仅需要表达什么是我们想要的。这种基本思想就意味着,你不太需要担心去显式地实现这些查询--它们已经为你处理好了。为什么我们不能在处理集合时也这样做呢?你发现自己有多少次都是在一遍又一遍地使用循环去重复实现这些操作呢?
    其次,我们如何才能更高效地去处理大型集合?理想情况下,在加速进行处理时,你会想到利用多核架构。然而,编写并行程序既困难又容易出错。
    Java SE 8赶来帮忙了!Java API的设计者们在升级API时引入了一种新的称之为Java流(流)的抽象,它允许你以声明形式去处理数据。另外,Java流可以利用到多核架构而不必编写一行多线程代码。听起来不错,不是吗?这就是本文章系列所要探究的主题。
    Java流能为我们做些什么呢?在探究这些细节之前,让我们先看一个例子,这样你才能对这种新的使用Java SE 8 Java流的编程风格有感觉。假设我们要找到所有类型为grocery的交易并返回它们的ID列表,并按交易金额的递减顺序对该列表进行排序。在Java SE 7中,我们应该会把清单1所示的程序那样去做。而在Java SE 8中,我们则会像清单2所示的那样去实现。
清单1
List<Transaction> groceryTransactions = new Arraylist<>();
for(Transaction t: transactions){
  
if(t.getType() == Transaction.GROCERY){
    groceryTransactions.add(t);
  }
}
Collections.sort(groceryTransactions, 
new Comparator(){
  
public int compare(Transaction t1, Transaction t2){
    
return t2.getValue().compareTo(t1.getValue());
  }
});
List
<Integer> transactionIds = new ArrayList<>();
for(Transaction t: groceryTransactions){
  transactionsIds.add(t.getId());
}

清单2
List<Integer> transactionsIds =
    transactions.stream()
                .filter(t 
-> t.getType() == Transaction.GROCERY)
                .sorted(comparing(Transaction::getValue).reversed())
                .map(Transaction::getId)
                .collect(toList());

    图1形象地解释了那段Java SE 8程序。首先,我们调用List对象中的Java流()方法从交易列表(数据)中获取一个Java流对象。然后,多个操作(过滤,排序,映射,归集)链接在一起形成了一条线,这条线可以被看作构成了一条数据查询。


    那么如何并行地执行该程序呢?在Java SE 8中这很简单:只需要使用parallelJava流()方法去替换Java流()方法,如清单3所示。Java流 API会在内部对你的查询进行解构,并利用上你机器中的多核处理器。
清单3
List<Integer> transactionsIds =
    transactions.parallelStream()
                .filter(t 
-> t.getType() == Transaction.GROCERY)
                .sorted(comparing(Transaction::getValue).reversed())
                .map(Transaction::getId)
                .collect(toList());

    在该关于Java SE 8 Java流的文章系列结束时,你将能够使用Java流 API编写出像清单3那样的功能强大的查询程序。

Java流入门
    让我们先从一点理论开始。Java流的定义是什么?一个简短的定义就是"来自于一个数据源的能够支持聚合操作的一串元素"。让我们把它拆开来说:
    一串元素:Java流为一串特定类型值的集合提供了一个接口。然后,Java流实际上并不存储元素,它们会在需要时被用上。
    数据源:Java流要使用一个提供数据的源,诸如集合对象,数组或I/O资源。
    聚合操作:Java流支持类SQL的操作,以及来自于函数编程语言的通用操作,诸如过滤,映射,归一,查找,匹配,排序,等等。
    另外,与集合操作非常不同的是,Java流操作拥有两项基本特质:
    管道:许多Java流操作会返回它们自己,这就使得这些操作能够链接在一起以组成一个大型管道。这样就可以进行一些诸如惰性和短路之类的优化,后面我们会进行探究。
    内部遍历:集合是显式地进行遍历(外部遍历),但不同于集合,Java流是在幕后进行遍历。让我们重新看看之前的示例代码来解释这些原理。图2形象地解释了清单2的更多细节。


    首先,通过调用Java流()方法,我们从交易列表中得到了一个Java流对象。那么数据源就是交易列表,它将向Java流中提供一串元素。然后,我们对该Java流应用了一系列的聚合操作:过滤(提供一个谓语去过滤元素),排序(提供一个比较器去对元素进行排序),以及映射(解析出信息)。所有的操作都会返回该Java流,以便能够链接这些操作去组成一个管道,这可被看作是对数据源的一个查询。
    在调用collect()操作之前,没有实际工作会被执行。collect()方法将开始处理这个管道以返回一个结果(某个不是Java流的对象,在此处,是一个List对象)。现在还不需要去关注collect()方法,我们会在以后的文章去一探究竟。此时,你会发现collect会将各种数据加工方法作为参数,将收集到的Java流元素归结为一个结果。此处,toList()就描述了一个将Java流对象转化为List对象的加工方法。
    在探究与Java流有关的各个方法之前,最好是停下来深入思考一下Java流和集合之间观念上的不同之处。

Java流 vs. 集合
    已有的Java集合概念与新的Java流概念都为一串元素提供了接口。那它们有何不同吗?简单地说,集合是关于数据的,而Java流是关于计算的。
    想想这种情况,一部存储在DVD中的电影。这就是一个集合(可能是字节,可能是帧--在此处,我们不必关心这些),因为它包含有全部的数据结构。现在再想想这种情况,这部电影被转化成了数据流,通过互联网去观看它。此时它就是一个(字节或帧的)流。流视频播放器只需要下载一些晚于用户当前所观看位置的帧就可以了。这样,你就可以在大部分值被计算出来之前先展示流开头处的值(想想流化一场现场直播的足球比赛)。
    粗看之,集合与流的区别就是与何时处理数据有关。集合是内存中的数据结构,它包含有当前数据结构中的全部值--将所有元素加入到集合之前,必须先对所有元素进行处理,相反地,Java流只是逻辑上固定的数据结构,它里面的元素只会根据需要进行处理。
    使用Collection接口,要求用户实现遍历(例如,使用增强的for循环,即foreach);这被称之为外部循环。相反地,Stream类库使用内部遍历--它已经为你实现好了遍历,它会关心存储流的结果值的位置;你仅需要提供一个函数告诉它要做些什么就行了。清单4(对集合的外部遍历)和清单5(对Java流的内部遍历)中的代码形象地展示了这一不同之处。
清单4
List<Integer> transactionIds = new ArrayList<>();
for(Transaction t: transactions){
    transactionIds.add(t.getId());
}

清单5
List<Integer> transactionIds =
    transactions.stream()
                .map(Transaction::getId)
                .collect(toList());

    在清单4中,我们显式且顺序地遍历了交易列表,抽取了每个交易ID,然后将它加到一个收集器中。相反地,当使用流时,没有显式的遍历。清单5中的代码构建了一个查询,其中的map操作被设定为一个参数,它会抽取交易ID,然后collect操作会把结果Stream对象转化成一个List对象。
你现在应该知道什么是Java流,以及如何去使用它。现在让我们看看Java流所支持的操作之间的区别,这样你就能构建自己的数据查询了。

Java流操作:使用流去处理数据

    java.util.stream.Stream接口定义了许多操作,它们可被归集为两类。在图1所示的例子中,你可以看到如下操作:
    过滤,排序和映射,它们可被连接在一起组成一个管道
    收集,它关闭了这个管道并返回结果
    能够被连接在一起的Java流操作被称为中间操作。这些操作之所以能被连接在一起,是因为它们都会返回Stream对象。这些操作从这个管道中返回结果,结果的类型可以是List,Integer,甚至是void(任何Stream以外的类型)
    你也许很好奇为什么这种区别很重要。是这样的,在这个Java流管道的最终操作被调用之前,中间操作并不会执行任何处理;它们是"惰性"方法。这是因为中间方法经常会被"合并",在最终操作中它们会被合成为单一的执行路径。
清单6
List<Integer> numbers = Arrays.asList(12345678);
List
<Integer> twoEvenSquares =
    numbers.stream()
           .filter(n 
-> {
                    System.out.println(
"filtering " + n);
                    
return n % 2 == 0;
                  })
           .map(n 
-> {
                    System.out.println(
"mapping " + n);
                    
return n * n;
                  })
           .limit(
2)
           .collect(toList());

    例如,思考下清单6中的程序,它是从给定的数列中计算奇数的平方。你可能会很惊讶,它打印出如下结果:
filtering 1
filtering 
2
mapping 
2
filtering 
3
filtering 
4
mapping 
4

    这是因为limit(2)使用了短路;我们只需要处理流的一部分,而不是全部,去得到一个结果。这就类似于测评一个由and操作符关联起来的大型布尔表达式链:一旦某个表达式返回了false,那么就可以认为整个表达式链就是false,而不必测评所有的表达式了。在这个例子中,limit()方法将返回的Java流的长度限定为2。另外,filter与map被合并在了同一条执行路径中了。

    归纳一下到目前为止,在使用Java流时我们所学到的内容,总言之,涉及三个方面:
    一个数据源(例如一个集合),对它执行查询
    一个中间操作的链,它组成一个流的管道
    一个最终操作,它执行流的管道并产生结果
    现在让我们看看Java流所支持的一些操作。参考java.util.stream.Stream接口可以得到这些方法的完整清单,再看看本文末尾所给出的资源,它包含有更多的例子。

    过滤。有多种方法可以对流中的元素进行过滤:
    filter(Predicate):使用一个谓语(java.util.function.Predicate)作为参数,它会返回一个包含所有匹配给定谓语条件元素的Java流。
    distinct:返回一个包含有唯一元素的Java流。
    limit(n):返回的流的长度不能超过n。
    skip(n):返回的流将不包括前n个元素。

    查找与匹配。一个通用的数据处理模式就要考虑一些元素是否匹配给定的属性。你可以使用anyMatch,allMatch和noneMatch方法帮你做到这一点。这些方法都会使用一个谓语参数并返回boolean值作为结果(所以,它们是最终操作)。例如,使用allMatch去查出交易流中所有金额大于100的交易,如清单7所示。
清单7
boolean expensive =
    transactions.stream()
                .allMatch(t 
-> t.getValue() > 100);

    另外,Stream接口提供了方法findFirst和findAny,以取出流中任一元素。它们可以与其它的流操作,如filter,结合起来使用。findFirst和findAny都会返回一个Optinal对象(见清单8)。
清单8
Optional<Transaction> =
    transactions.stream()
                .findAny(t 
-> t.getType() == Transaction.GROCERY);

    Optional<T>类(java.util.Optional)是一个容器类,它代表一个存在或不存在的值。清单8中的程序,findAny方法可能没有找到任何类型为grocery的交易。Optional类包含多个方法去测试一个元素是否存在。例如,如果交易存在,通过使用ifPresent方法,我们可以选择一个操作去应用这个Optaional对象,如清单9所示(此处只是打印交易)。
清单9
transactions.stream()
              .findAny(t 
-> t.getType() == Transaction.GROCERY)
              .ifPresent(System.out::println);

    映射。Java流支持map方法,它使用一个函数(java.util.function.Function)作为参数,将流元素投影到其它形式。这个函数会被应用到每个元素,并将元素"映射"到新的元素。
    例如,你可能会想到使用它去抽取流中每个元素的信息。在清单10的例子中,我们返回了一个列表中每个字的长度。
清单10
List<String> words = Arrays.asList("Oracle""Java""Magazine");
 List
<Integer> wordLengths =
    words.stream()
         .map(String::length)
         .collect(toList());

    归一。到目前为止,我们已见过的最终操作会返回boolean(allMatch等等),void(forEach)或Optaional对象(findAny等等)。我们也使用collect方法将Stream对象中的所有元素放到一个List对象中。
    然而,你也可以将流中的元素放到一个查询中,该查询可表达更为复杂的数据处理,例如"拥有最大ID"或者"算出所以交易金额的和"。这就可能对Java流用上reduce方法,该方法会对每个元素重复地应用一个操作(例如,加上两个数字),直到生成结果。在函数式编程中,这常被称为折叠操作。因为该操作可被看作重复地"折叠"一张很长的纸(Stream对象),直到这张纸的面积变得只有一点儿了。这就是折叠操作的结果。
    看看我们是如何使用循环去计算一个组数字的和会有助于理解这个问题:
int sum = 0;
for (int x : numbers) {
  sum 
+= x;
}

    列表中的每一个数字元素都被迭代地组合在一起,并使用一个额外的操作符去产生结果。本质上,我们就是把一组数字"归一"成一个数字。在这段代码中有两个参数:数字和变量的初始值,即该例中的0,以及用于合并所有元素的操作符,即本例中的+。
清单11
int sum = numbers.stream().reduce(0, (a, b) -> a + b);

    对Java流使用reduce方法,我们可以计算出流中的所有元素值之和,如清单11所示。reduce方法使用两个参数:
    初始值,0
    BinaryOperation<T>,合并两个元素,并产生一个新值
    reduce方法本质上就是重复应用模式的抽象。其它的查询,如"计算产量"或"计算最大值"(如清单12所示)则是reduce方法的特别实例。
清单12
int product = numbers.stream().reduce(1, (a, b) -> a * b);
int product = numbers.stream().reduce(1, Integer::max);

数字流
    你已经看到可以使用reduce方法去计算整数流的和。然后,这也是有成本的:我们重复执行了许多拆箱操作以将Integer对象加到一起。如果我们能调用一个sum方法,使程序的意图更为明显,就像清单13那样,岂不是更好?
清单13
int statement =
    transactions.stream()
                .map(Transaction::getValue)
                .sum(); 
// error since Stream has no sum method

    Java 8引入的三个特定的基本数据类型的流接口来应对这个问题--IntStream,DoubleStream和LongStream--它们专注于元素分别为int,double和long型的Java流。将一个流转化为特定类型的流,你最常使用的方法就是mapToInt,mapToDouble和mapToLong。这些方法与我们较早前看到的map方法是一样的,但它们会返回特定类型的Stream对象,而不是Stream<T>对象。例如,我们可以改进下清单13中的代码,如清单14所示那样。你也可以使用装箱操作将一个基本数据类型的流转化成一个使用包装对象的流。
清单14
int statementSum =
    transactions.stream()
                .mapToInt(Transaction::getValue)
                .sum(); 
// works!

    最后,数字流的另一种有用的形式是数字区间。比如,你可能想生成介于1到100之间的所有数字。为了帮助生成这种区间,Java SE 8在IntStream,DoubleStream和LongStream中分别引入了两个静态方法:range和rangeClosed。
    这两个方法都会使用两个参数,第一个参数是起始值,第二个参数是终止值。但是range方法生成的区间不会包含终止值本身,但rangeClosed生成的区间则会包含。清单15是一个使用rangeClosed方法的例子,它返回一个包含有全部介于10到30之间奇数的流。
清单15
IntStream oddNumbers =
    IntStream.rangeClosed(
1030)
             .filter(n 
-> n % 2 == 1);

构建流
    有多种途径可以去构建一个流。你已经看过如何从集合对象中构建流。另外,我们还操控过数字流。你也可以从值,数组或文件中去创建流。另外,你甚至于可以从一个函数中生成无限流。
    可以直截了当地从值或数组中创建流:只需要使用一些静态方法即可,对于值,是Stream.of();而对于数组,则要调用Arrays.stream()。如清单16所示。
清单16
Stream<Integer> numbersFromValues = Stream.of(1234);
int[] numbers = {1234};
IntStream numbersFromArray 
= Arrays.stream(numbers);

    你也可以将一个文件转化为其内容行的流,使用静态方法Files.lines()即可。清单17就使用该方法计算了文件中行的数量。
清单17
long numberOfLines =
    Files.lines(Paths.get(“yourFile.txt”), Charset.defaultCharset())
         .count();

    无限流。最后,在总结本文之前,有一个令人非常兴奋的主意。到现在为止,你应该理解到流中的元素是按需生成的。有两个静态方法--Stream.iterate()和Stream.generate()--可以让你从一个函数中创建流。然而,因为被使用的元素是按需生成的,所以这两个方法可以"永远地"生成元素。这就是为什么我们称它为无限流:它就是没有固定大小的流,但它做的事情与一个从固定集合生成的流是一样的。
    清单18就是一个使用iterate方法的例子,它会包含10的所有倍数。iterate方法使用一个起始值(此处的0)和一个Lambda表达式(类型为UnaryOperator<T>)去顺序地生成每一个新值。
清单18
Stream<Integer> numbers = Stream.iterate(0, n -> n + 10);

    我们也可以使用limit方法,以从一个无限流中得到一个固定流。如清单19所示,可以将流的长度限制为5。
清单19
numbers.limit(5).forEach(System.out::println); // 0, 10, 20, 30, 40

结论
    Java SE 8引入了Streams API,它让你能够表达更为复杂的数据处理查询。在本文中,你已见到流可以支持许多操作,诸如过滤,映射,归一和迭代,把它们结合在一起可以写出简洁的、更富表现力的数据处理查询。这种新的编程方法远不同于Java SE 8之前的集合处理。但是,它有许多好处。首先,它利用到了诸如惰性或短路这样的技术,以优化数据处理查询的性能。其次,能够自动地利用上多核架构,以并行地处理流。在本文章系统的第二部分中,我们将探索更高级的操作,例如flatMap和collect。请继续关注。
posted on 2014-07-27 20:54 John Jiang 阅读(4565) 评论(6)  编辑  收藏 所属分类: JavaSEJava翻译

评论

# re: 利用Java SE 8流处理数据(I)(译) 2014-07-30 22:08 月小升技术博客
我们搞技术的,玩不过他们,年年升级,我们年年要学习。。  回复  更多评论
  

# re: 利用Java SE 8流处理数据(I)(译) 2014-07-31 10:00 Sha Jiang
@月小升技术博客
这没办法...人家是创新,我们是应用@_@  回复  更多评论
  

# re: 利用Java SE 8流处理数据(I)(译) 2014-08-04 10:28 创业成功
不大看得懂.  回复  更多评论
  

# re: 利用Java SE 8流处理数据(I)(译)[未登录] 2014-08-11 16:24 xin
清单4的transactionId类型怎么变成了String,清单6的变量d应该是n吧  回复  更多评论
  

# re: 利用Java SE 8流处理数据(I)(译)[未登录] 2014-08-11 16:27 xin
toList是使用import static java.util.stream.Collectors.toList;了吧  回复  更多评论
  

# re: 利用Java SE 8流处理数据(I)(译) 2014-08-12 10:14 Sha Jiang
@xin
谢谢指正  回复  更多评论
  


只有注册用户登录后才能发表评论。


网站导航: