Hadoop Streaming 指定排序的域和分割符

Hadoop Streaming 指定排序的域和分割符

作者:admin |  时间:2015-05-06 |  浏览:144 |  0 条评论
1、如果没有指定额外 partition和comparator的参数
     stream.map.output.key .... 指定的key用来作分桶和排序

2、 我们来看个例子, 比如mapper的输出为:  
223515735|1405227460|RecFriend_show|270519295 TR|221622835 TR|224632830 TR|230866906 TR
223515735|1405227460|RecFriend_zforward
223515735|1405227461|RecFriend_show|222619835 TR|230838233 TR|238462124 TR|202544775 TR
223515735|1405227461|RecFriend_zforward
223515735|1405227507|RecFriend_show|229237722 TR|231814160 TR|241687505 TR|224517836 TR
223531575|1405254451|RecFeedYMK_show|224179432 TR|235884670 TR|316630389 TR|224789861 TR|228411232 TR
    如果不指定stream.map.output.field.separator 和stream.num.map.output.key.fields,则默认第一个"\t"之前的域为key,后边的作为value。 mapper输出之后, 系统会根据这两个参数对mapper的输出重新做切分,然后才传给reducer。 
 (1) 以上边的mapper输出为例, 
      如果指定参数  -D stream.map.output.field.separator='|'
                             -D stream.num.map.output.key.fields=2
      则会以"|"分割的前两个域做为key, 后边的部分作为value
      输出这种格式:
      223515735|1405227460          RecFriend_show|270519295 TR|221622835 TR|224632830 TR|230866906 TR
      223515735|1405227460    RecFriend_zforward
      223515735|1405227461    RecFriend_show|222619835 TR|230838233 TR|238462124 TR|202544775 TR
      223515735|1405227461    RecFriend_zforward

 (2) 假设我们想以第一个域的uid来分桶, 但是reducer排序的时候以前3个域进行排序,怎么搞呢?
       cmd = "%s " % (HADOOP_JAR, ) + \
              " -D  mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator " + \
              " -D map.output.key.field.separator='|' " + \
              " -D mapred.text.key.partitioner.options='-k1,1'" + \
              " -D mapred.text.key.comparator.options='-k1,3' " + \
              " -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner" + \
              " -file %s" % (mapper_abs_path, ) + \
              " -mapper %s" % (mapper_abs_path, ) + \
              " -file %s" % (reducer_abs_path, ) + \
              " -reducer '%s %s'" % (reducer_abs_path, fdate) + \
              " -input %s" % (hadoop_input, ) + \
              " -output %s" % (hadoop_output, ) + \
              " -numReduceTasks %d" % (tasks);
     几点说明:
      *  不指定(1)中的两个参数,系统默认以第一个"\t"之前的域作为key,没有"\t",则以整行作为key
      *  -D map.output.key.field.separator='|'  这个参数指定,key本身里的separator是 "|"
      *  -D mapred.text.key.partitioner.options='-k1,1' 这个参数指定,分桶的时候以key里的第一个域(uid)分桶
      *  自己指定分桶的域,需要给出参数 -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner" 
      *   -D mapred.text.key.comparator.options='-k1,3' 这个参数指定,key里用来排序的域是 -k1,3
      *  自己指定排序的域,需要给出参数:
           -D  mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator  


More usage examples

Customizing the Way to Split Lines into Key/Value Pairs

As noted earlier, when the Map/Reduce framework reads a line from the stdout of the mapper, it splits the line into a key/value pair. By default, the prefix of the line up to the first tab character is the key and the the rest of the line (excluding the tab character) is the value.

However, you can customize this default. You can specify a field separator other than the tab character (the default), and you can specify the nth (n >= 1) character rather than the first character in a line (the default) as the separator between the key and value. For example:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \

    -input myInputDirs \

    -output myOutputDir \

    -mapper org.apache.hadoop.mapred.lib.IdentityMapper \

    -reducer org.apache.hadoop.mapred.lib.IdentityReducer \

    -D stream.map.output.field.separator=. \

    -D stream.num.map.output.key.fields=4

In the above example, "-D stream.map.output.field.separator=." specifies "." as the field separator for the map outputs, and the prefix up to the fourth "." in a line will be the key and the rest of the line (excluding the fourth ".") will be the value. If a line has less than four "."s, then the whole line will be the key and the value will be an empty Text object (like the one created by new Text("")).

Similarly, you can use "-D stream.reduce.output.field.separator=SEP" and "-D stream.num.reduce.output.fields=NUM" to specify the nth field separator in a line of the reduce outputs as the separator between the key and the value.

Similarly, you can specify "stream.map.input.field.separator" and "stream.reduce.input.field.separator" as the input separator for map/reduce inputs. By default the separator is the tab character.

A Useful Partitioner Class (secondary sort, the -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner option)

Hadoop has a library class, KeyFieldBasedPartitioner, that is useful for many applications. This class allows the Map/Reduce framework to partition the map outputs based on certain key fields, not the whole keys. For example:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \

    -input myInputDirs \

    -output myOutputDir \

    -mapper org.apache.hadoop.mapred.lib.IdentityMapper \

    -reducer org.apache.hadoop.mapred.lib.IdentityReducer \

    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \

    -D stream.map.output.field.separator=. \

    -D stream.num.map.output.key.fields=4 \

    -D map.output.key.field.separator=. \

    -D mapred.text.key.partitioner.options=-k1,2\

    -D mapred.reduce.tasks=12

Here, -D stream.map.output.field.separator=. and -D stream.num.map.output.key.fields=4 are as explained in previous example. The two variables are used by streaming to identify the key/value pair of mapper.

The map output keys of the above Map/Reduce job normally have four fields separated by ".". However, the Map/Reduce framework will partition the map outputs by the first two fields of the keys using the -D mapred.text.key.partitioner.options=-k1,2 option. Here, -D map.output.key.field.separator=. specifies the separator for the partition. This guarantees that all the key/value pairs with the same first two fields in the keys will be partitioned into the same reducer.

This is effectively equivalent to specifying the first two fields as the primary key and the next two fields as the secondary. The primary key is used for partitioning, and the combination of the primary and secondary keys is used for sorting. A simple illustration is shown here:

Output of map (the keys)

11.12.1.2

11.14.2.3

11.11.4.1

11.12.1.1

11.14.2.2

Partition into 3 reducers (the first 2 fields are used as keys for partition)

11.11.4.1

-----------

11.12.1.2

11.12.1.1

-----------

11.14.2.3

11.14.2.2

Sorting within each partition for the reducer(all 4 fields used for sorting)

11.11.4.1

-----------

11.12.1.1

11.12.1.2

-----------

11.14.2.2

11.14.2.3

A Useful Comparator Class

Hadoop has a library class, KeyFieldBasedComparator, that is useful for many applications. This class provides a subset of features provided by the Unix/GNU Sort. For example:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \

    -input myInputDirs \

    -output myOutputDir \

    -mapper org.apache.hadoop.mapred.lib.IdentityMapper \

    -reducer org.apache.hadoop.mapred.lib.IdentityReducer \

    -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \

    -D stream.map.output.field.separator=. \

    -D stream.num.map.output.key.fields=4 \

    -D map.output.key.field.separator=. \

    -D mapred.text.key.comparator.options=-k2,2nr\

    -D mapred.reduce.tasks=12

The map output keys of the above Map/Reduce job normally have four fields separated by ".". However, the Map/Reduce framework will sort the outputs by the second field of the keys using the -D mapred.text.key.comparator.options=-k2,2nr option. Here, -n specifies that the sorting is numerical sorting and -r specifies that the result should be reversed. A simple illustration is shown below:

Output of map (the keys)

11.12.1.2

11.14.2.3

11.11.4.1

11.12.1.1

11.14.2.2

Sorting output for the reducer(where second field used for sorting)

11.14.2.3

11.14.2.2

11.12.1.2

11.12.1.1

11.11.4.1

Working with the Hadoop Aggregate Package (the -reduce aggregate option)

Hadoop has a library package called Aggregate. Aggregate provides a special reducer class and a special combiner class, and a list of simple aggregators that perform aggregations such as "sum", "max", "min" and so on over a sequence of values. Aggregate allows you to define a mapper plugin class that is expected to generate "aggregatable items" for each input key/value pair of the mappers. The combiner/reducer will aggregate those aggregatable items by invoking the appropriate aggregators.

To use Aggregate, simply specify "-reducer aggregate":

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \

    -input myInputDirs \

    -output myOutputDir \

    -mapper myAggregatorForKeyCount.py \

    -reducer aggregate \

    -file myAggregatorForKeyCount.py \

    -D mapred.reduce.tasks=12

The python program myAggregatorForKeyCount.py looks like:

#!/usr/bin/python

 

import sys;

def generateLongCountToken(id):

    return "LongValueSum:" + id + "\t" + "1"

def main(argv):

    line = sys.stdin.readline();

    try:

        while line:

            line = line[:-1];

            fields = line.split("\t");

            print generateLongCountToken(fields[0]);

            line = sys.stdin.readline();

    except "end of file":

        return None

if __name__ == "__main__":

     main(sys.argv)

Field Selection ( similar to unix 'cut' command)

Hadoop has a library class, org.apache.hadoop.mapred.lib.FieldSelectionMapReduce, that effectively allows you to process text data like the unix "cut" utility. The map function defined in the class treats each input key/value pair as a list of fields. You can specify the field separator (the default is the tab character). You can select an arbitrary list of fields as the map output key, and an arbitrary list of fields as the map output value. Similarly, the reduce function defined in the class treats each input key/value pair as a list of fields. You can select an arbitrary list of fields as the reduce output key, and an arbitrary list of fields as the reduce output value. For example:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \

    -input myInputDirs \

    -output myOutputDir \

    -mapper org.apache.hadoop.mapred.lib.FieldSelectionMapReduce\

    -reducer org.apache.hadoop.mapred.lib.FieldSelectionMapReduce\

    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \

    -D map.output.key.field.separa=. \

    -D mapred.text.key.partitioner.options=-k1,2 \

    -D mapred.data.field.separator=. \

    -D map.output.key.value.fields.spec=6,5,1-3:0- \

    -D reduce.output.key.value.fields.spec=0-2:5- \

    -D mapred.reduce.tasks=12

The option "-D map.output.key.value.fields.spec=6,5,1-3:0-" specifies key/value selection for the map outputs. Key selection spec and value selection spec are separated by ":". In this case, the map output key will consist of fields 6, 5, 1, 2, and 3. The map output value will consist of all fields (0- means field 0 and all the subsequent fields).

The option "-D reduce.output.key.value.fields.spec=0-2:5-" specifies key/value selection for the reduce outputs. In this case, the reduce output key will consist of fields 0, 1, 2 (corresponding to the original fields 6, 5, 1). The reduce output value will consist of all fields starting from field 5 (corresponding to all the original fields).

本文标签:

相关推荐

libev源码学习
Posted on 08月17日
C语言中volatile关键字的作用
Posted on 02月09日
用Hadoop,还是不用Hadoop?
Posted on 03月18日
PHP类继承间的方法调用关系
Posted on 03月02日

发表评论

电子邮件地址不会被公开。

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>