随笔-200  评论-148  文章-15  trackbacks-0
转自http://bbs.csdn.net/topics/390369075 

最近项目中使用ETL工具kettle(4.+)对数据库的数据进行清洗,使用工具spoon来使用一些图形化的操作比较简单,抽空研究了下用使用kettle的一些jar包,把kettle结合到java(web项目一样)程序中。留作一个记录,以后备用查看。

    先看看网站上下了很多资料看http://infocenter.pentaho.com/help/index.jsp?topic=%2Fcat_dev_guides%2Ftop_dev_guides.html(主要看Developer Guides/Embedding and Extending Pentaho Data Integration/...)
    下载kettle的api和源码看看,也可以帮你解决不少问题的。
    下面是我自己写(加参考)的一个生成.ktr文件的代码。
    (添加的jar包,我也没有太多的注意,看例子加入(有些可能没有必要,可以尝试的去掉一些测试下)的:
avalon-framework-4.1.3.jar
commons-collections-3.2.jar
commons-io-1.4.jar
commons-lang-2.4.jar
commons-logging-1.1.jar
commons-vfs-20091118-pentaho.jar
kettle-core-4.4.0-GA.jar
kettle-db-4.4.0-GA.jar
kettle-engine-4.4.0-GA.jar
log4j-1.2.12.jar
logkit-1.0.1.jar
servlet-api-2.3.jar
)

Java code
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package com.yoodo.trans;
 
import java.io.File;
 
import org.apache.commons.io.FileUtils;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleXMLException;
import org.pentaho.di.core.plugins.PluginRegistry;
import org.pentaho.di.core.plugins.StepPluginType;
import org.pentaho.di.trans.TransHopMeta;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.steps.insertupdate.InsertUpdateMeta;
import org.pentaho.di.trans.steps.tableinput.TableInputMeta;
 
public class TransDemo {
     public static TransDemo transDemo;
     
     /**
      * 两个库中的表名
      */
     public static String bjdt_tablename = "T_USER";
     public static String kettle_tablename = "T_USER";
      
    /**
     * 数据库连接信息,适用于DatabaseMeta其中 一个构造器DatabaseMeta(String xml)
     */
     public static final String[] databasesXML = {
            "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
              "<connection>" +
                "<name>bjdt</name>" +
                "<server>192.168.1.101</server>" +
                "<type>Oracle</type>" +
                "<access>Native</access>" 
                "<database>orcl</database>" +
                "<port>1521</port>" +
                "<username>bjdtuser</username>" +
                "<password>password</password>" +
              "</connection>",
              "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
              "<connection>" +
                "<name>kettle</name>" +
                "<server>192.168.1.101</server>" +
                "<type>Oracle</type>" +
                "<access>Native</access>" 
                "<database>orcl</database>" +
                "<port>1521</port>" +
                "<username>kettleuser</username>" +
                "<password>password</password>" +
              "</connection>"
        };   
    /**
     * @param args
     */
    public static void main(String[] args) {
        try {
            KettleEnvironment.init();
            transDemo = new TransDemo();
            TransMeta transMeta = transDemo.generateMyOwnTrans();
            String transXml = transMeta.getXML();
            //System.out.println("transXml:"+transXml);
            String transName = "etl/update_insert_Trans.ktr";
            File file = new File(transName);
            FileUtils.writeStringToFile(file, transXml, "UTF-8");
             
        //    System.out.println(databasesXML.length+"\n"+databasesXML[0]+"\n"+databasesXML[1]);
        catch (Exception e) {
            e.printStackTrace();
            return;
        }
         
    }
     
    /**
     * 生成一个转化,把一个数据库中的数据转移到另一个数据库中,只有两个步骤,第一个是表输入,第二个是表插入与更新操作
     * @return
     * @throws KettleXMLException 
     */
    public TransMeta generateMyOwnTrans() throws KettleXMLException{
         
        System.out.println("************start to generate my own transformation***********");
         
        TransMeta transMeta = new TransMeta();
         
        //设置转化的名称 
        transMeta.setName("insert_update");
         
        //添加转换的数据库连接
        for (int i=0;i<databasesXML.length;i++){
            DatabaseMeta databaseMeta = new DatabaseMeta(databasesXML[i]);
            transMeta.addDatabase(databaseMeta);
        }
         
        //registry是给每个步骤生成一个标识Id用
        PluginRegistry registry = PluginRegistry.getInstance();
         
        //******************************************************************
         
        //第一个表输入步骤(TableInputMeta)
        TableInputMeta tableInput = new TableInputMeta();
        String tableInputPluginId = registry.getPluginId(StepPluginType.class, tableInput);
        //给表输入添加一个DatabaseMeta连接数据库
        DatabaseMeta database_bjdt = transMeta.findDatabase("bjdt");
        tableInput.setDatabaseMeta(database_bjdt);
        String select_sql = "SELECT ID, USERNAME, PASSWORD, SEX, AGE, TELEPHONE, ADDRESS FROM "+bjdt_tablename;
        tableInput.setSQL(select_sql);
         
        //添加TableInputMeta到转换中
        StepMeta tableInputMetaStep = new StepMeta(tableInputPluginId,"table input",tableInput);
         
        //给步骤添加在spoon工具中的显示位置
        tableInputMetaStep.setDraw(true);
        tableInputMetaStep.setLocation(100100);
         
        transMeta.addStep(tableInputMetaStep);
        //******************************************************************
         
        //******************************************************************
        //第二个步骤插入与更新
        InsertUpdateMeta insertUpdateMeta = new InsertUpdateMeta();
        String insertUpdateMetaPluginId = registry.getPluginId(StepPluginType.class,insertUpdateMeta);
        //添加数据库连接
        DatabaseMeta database_kettle = transMeta.findDatabase("kettle");
        insertUpdateMeta.setDatabaseMeta(database_kettle);
        //设置操作的表
        insertUpdateMeta.setTableName(kettle_tablename);
         
        //设置用来查询的关键字
        insertUpdateMeta.setKeyLookup(new String[]{"ID"});
        insertUpdateMeta.setKeyStream(new String[]{"ID"});
        insertUpdateMeta.setKeyStream2(new String[]{""});//一定要加上
        insertUpdateMeta.setKeyCondition(new String[]{"="});
         
        //设置要更新的字段
        String[] updatelookup = {"ID","USERNAME","PASSWORD","SEX","AGE","TELEPHONE","ADDRESS"} ;
         String [] updateStream = {"ID","USERNAME","PASSWORD","SEX","AGE","TELEPHONE","ADDRESS"};
         Boolean[] updateOrNot = {false,true,true,true,true,true,true};
         insertUpdateMeta.setUpdateLookup(updatelookup);
        insertUpdateMeta.setUpdateStream(updateStream);
        insertUpdateMeta.setUpdate(updateOrNot);
        String[] lookup = insertUpdateMeta.getUpdateLookup();
        //System.out.println("******:"+lookup[1]);
        //System.out.println("insertUpdateMetaXMl:"+insertUpdateMeta.getXML());
        //添加步骤到转换中
        StepMeta insertUpdateStep = new StepMeta(insertUpdateMetaPluginId,"insert_update",insertUpdateMeta);
        insertUpdateStep.setDraw(true);
        insertUpdateStep.setLocation(250,100);
        transMeta.addStep(insertUpdateStep);
        //******************************************************************
         
        //******************************************************************
        //添加hop把两个步骤关联起来
        transMeta.addTransHop(new TransHopMeta(tableInputMetaStep, insertUpdateStep));
        System.out.println("***********the end************");
        return transMeta;
    }
}
posted on 2014-03-02 08:15 无声 阅读(1422) 评论(0)  编辑  收藏

只有注册用户登录后才能发表评论。


网站导航: