Skynet

---------- ---------- 我的新 blog : liukaiyi.cublog.cn ---------- ----------

  BlogJava :: 首页 :: 联系 :: 聚合  :: 管理
  112 Posts :: 1 Stories :: 49 Comments :: 0 Trackbacks

#


首先 本文中的 hadoop join  在实际开发没有用处!
如果在开发中 请使用 cascading  groupby, 进行 hadoop join,
本文只是为探讨弄懂 cascading 实现做准备。

当然 如果有有人 hadoop join 过 请联系我,大家交流下 !

文件可能需要的一些参考:
hadoop jython ( windows )
jython ,jython 编译以及jar 包
少量 linux shell


本文介绍 hadoop 可能使用到的 join 接口测试 ,已经参考:
使用Hadoop实现Inner Join操作的方法【from淘宝】:http://labs.chinamobile.com/groups/58_547

下面 测试后 ,我这大体上 对 hadoop  join 的方式是这样理解的 (猜想):
数据1 ; 数据2
job1.map( 数据1 ) =(临时文件1)>  文件标示1+需要join列  数据
job2.map( 数据2 ) =(临时文件2)>  文件标示2+需要join列  数据

临时文件 mapred.join.expr 生成
job3.map ->
文件标示1+需要join列 : 数据
文件标示2+需要join列 : 数据
......
job3.Combiner - >
需要join列 : 文件标示1+数据
需要join列 : 文件标示2+数据
job3.Reducer->
需要join列 : 使用 java-list > 生成
  文件2-列x [  数据,数据... ]
  文件1-列x [  数据,数据... ]
然后 你这 left join ,或 inner join 或 xxx join 逻辑 就自己来吧


结果集合
[root@localhost python]# cat /home/megajobs/del/jobs/tools/hadoop-0.18.3/data/090907/1
1
2
3
4
5
[root@localhost python]# cat /home/megajobs/del/jobs/tools/hadoop-0.18.3/data/090907/2
2
4
3
1

修改 ..../hadoop-0.18.3/src/examples/python/compile
#!/usr/bin/env bash

export HADOOP_HOME
=/home/xx/del/jobs/tools/hadoop-0.18.3
export CASCADING_HOME
=/home/xx/del/jobs/tools/cascading-1.0.16-hadoop-0.18.3
export JYTHON_HOME
=/home/xx/del/jobs/tools/jython2.2.1

export CLASSPATH
="$HADOOP_HOME/hadoop-0.18.3-core.jar"                                            

# so that filenames w/ spaces are handled correctly in loops below
IFS=

# add libs to CLASSPATH

for f in $HADOOP_HOME/lib/*.jar; do                                                               
  CLASSPATH
=${CLASSPATH}:$f;
done

for f in $HADOOP_HOME/lib/jetty-ext/*.jar; do
  CLASSPATH
=${CLASSPATH}:$f;
done

for f in $CASCADING_HOME/*.jar; do
  CLASSPATH
=${CLASSPATH}:$f;
done

for f in $CASCADING_HOME/lib/*.jar; do
  CLASSPATH
=${CLASSPATH}:$f;
done


for f in $JYTHON_HOME/*.jar; do
  CLASSPATH
=${CLASSPATH}:$f;
done

# restore ordinary behaviour
unset IFS

/home/xx/del/jobs/tools/jython2.2.1/jythonc -p org.apache.hadoop.examples --j $1.jar  -c $1.py 
/home/xx/del/jobs/tools/hadoop-0.18.3/bin/hadoop jar $1.jar $2 $3 $4 $5 $6 $7 $8 $9 


简单 数据 链接 :
from org.apache.hadoop.fs import Path                                                             
from org.apache.hadoop.io import *                                                                
from org.apache.hadoop.mapred.lib import *                                                        
from org.apache.hadoop.mapred.join  import *                                                      
from org.apache.hadoop.mapred import *                                                            
import sys                                                                                        
import getopt                                                                                     
                                                                                                  
class tMap(Mapper, MapReduceBase):                                                                
        
def map(self, key, value, output, reporter):                                              
                output.collect( Text( str(key) ) , Text( value.toString() ))                      
                                                                                       
                               
def main(args):                                                                                   
        conf 
= JobConf(tMap)                                                                      
        conf.setJobName(
"wordcount")                                                              
                                                                                                  
        conf.setMapperClass( tMap )                                                               

        FileInputFormat.setInputPaths(conf,[ Path(sp) for sp in args[1:-1]])                      
        conf.setOutputKeyClass( Text )
        conf.setOutputValueClass( Text )                                                         

        conf.setOutputPath(Path(args[
-1]))                                                        
        
        JobClient.runJob(conf)                                                                    
        
if __name__ == "__main__":main(sys.argv)     

运行
./compile test file:///home/xx/del/jobs/tools/hadoop-0.18.3/data/090907/1 file:///home/xx/del/jobs/tools/hadoop-0.18.3/data/090907/2   file:///home/xx/del/jobs/tools/hadoop-0.18.3/tmp/wc78
结果:
[xx@localhost wc78]$ cat ../wc78/part-00000
0    1
0    2
2    4
2    2
4    3
4    3
6    1
6    4
8    5


简单的数据 join :
from org.apache.hadoop.fs import Path
from org.apache.hadoop.io import *
from org.apache.hadoop.mapred.lib import *
from org.apache.hadoop.mapred.join  import *
from org.apache.hadoop.mapred import *
import sys
import getopt

class tMap(Mapper, MapReduceBase):
        
def map(self, key, value, output, reporter):
                output.collect( Text( str(key) ) , Text( value.toString() ))

def main(args):
        conf 
= JobConf(tMap)
        conf.setJobName(
"wordcount")
        conf.setMapperClass( tMap )

        conf.set("mapred.join.expr", CompositeInputFormat.compose("override",TextInputFormat, args[1:-1] ) )
        conf.setOutputKeyClass( Text )
        conf.setOutputValueClass( Text )

        conf.setInputFormat(CompositeInputFormat)
     
        conf.setOutputPath(Path(args[
-1]))

        JobClient.runJob(conf)

if __name__ == "__main__":main(sys.argv)
        

运行结果 (  ) :
./compile test file:///home/xx/del/jobs/tools/hadoop-0.18.3/data/090907/1 file:///home/xx/del/jobs/tools/hadoop-0.18.3/data/090907/2   file:///home/xx/del/jobs/tools/hadoop-0.18.3/tmp/wc79
[xx@localhost wc78]$ cat ../wc79/part-00000
0    2
2    4
4    3
6    1
8    5











posted @ 2009-09-08 10:39 刘凯毅 阅读(1638) | 评论 (2)编辑 收藏

参考 : hadoop window 搭建 后,由于对 py 的语法喜欢 ,一直想 把hadoop,改成jython 的
这次 在 自己电脑上  终于 完成,下面介绍过程:

测试环境:
依然的 windows + cygwin
hadoop 0.18  # C:/cygwin/home/lky/tools/java/hadoop-0.18.3
jython 2.2.1 # C:/jython2.2.1

参考: PythonWordCount

启动 hadoop 并到 hdoop_home 下
# 在云环境中创建 input 目录
$>bin/hadoop dfs -mkdir input

# 在 包 hadoop 的 NOTICE.txt 拷贝到 input 目录下
$>bin/hadoop dfs -copyFromLocal c:/cygwin/home/lky/tools/java/hadoop-0.18.3/NOTICE.txt  hdfs:///user/lky/input

$>cd
src/examples/python

# 创建 个 脚本 ( jy->jar->hd run  ) 一步完成!
# 当然 在 linux 写个脚本比这 好看 呵呵!
$>vim run.bat
"C:\Program Files\Java\jdk1.6.0_11\bin\java.exe"  -classpath "C:\jython2.2.1\jython.jar;%CLASSPATH%" org.python.util.jython C:\jython2.2.1\Tools\jythonc\jythonc.py   -p org.apache.hadoop.examples -d -j wc.jar -c %1

sh C:\cygwin\home\lky\tools\java\hadoop-
0.18.3\bin\hadoop jar wc.jar  %2 %3 %4 %5 %6 %7 %8 %9

# 修改 jythonc 打包 环境 。 +hadoop jar
$>vim C:\jython2.2.1\Tools\jythonc\jythonc.py
# Copyright (c) Corporation for National Research Initiatives
# Driver script for jythonc2.  See module main.py for details
import sys,os,glob

for fn in glob.glob('c:/cygwin/home/lky/tools/java/hadoop-0.18.3/*.jar') :sys.path.append(fn)
for fn in glob.glob('c:/jython2.2.1/*.jar') :sys.path.append(fn)
for fn in glob.glob('c:/cygwin/home/lky/tools/java/hadoop-0.18.3/lib/*.jar'
) :sys.path.append(fn)

import main
main.main()

import os
os._exit(0)


# 运行
C:/cygwin/home/lky/tools/java/hadoop-0.18.3/src/examples/python>
  run.bat WordCount.py  hdfs:///user/lky/input  file:///c:/cygwin/home/lky/tools/java/hadoop-0.18.3/tmp2




结果输出:

cat c:/cygwin/home/lky/tools/java/hadoop-0.18.3/tmp2/part-00000
(http://www.apache.org/).       1
Apache  1
Foundation      1
Software        1
The     1
This    1
by      1
developed       1
includes        1
product 1
software        1

下面重头来了 :(简洁的 jy hdoop 代码)
#
#
 Licensed to the Apache Software Foundation (ASF) under one
#
 or more contributor license agreements.  See the NOTICE file
#
 distributed with this work for additional information
#
 regarding copyright ownership.  The ASF licenses this file
#
 to you under the Apache License, Version 2.0 (the
#
 "License"); you may not use this file except in compliance
#
 with the License.  You may obtain a copy of the License at
#
#
     http://www.apache.org/licenses/LICENSE-2.0
#
#
 Unless required by applicable law or agreed to in writing, software
#
 distributed under the License is distributed on an "AS IS" BASIS,
#
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
 See the License for the specific language governing permissions and
#
 limitations under the License.
#

from org.apache.hadoop.fs import Path
from org.apache.hadoop.io import *
from org.apache.hadoop.mapred import *

import sys
import getopt

class WordCountMap(Mapper, MapReduceBase):
    one 
= IntWritable(1)
    
def map(self, key, value, output, reporter):
        
for w in value.toString().split():
            output.collect(Text(w), self.one)

class Summer(Reducer, MapReduceBase):
    
def reduce(self, key, values, output, reporter):
        sum 
= 0
        
while values.hasNext():
            sum 
+= values.next().get()
        output.collect(key, IntWritable(sum))

def printUsage(code):
    
print "wordcount [-m <maps>] [-r <reduces>] <input> <output>"
    sys.exit(code)

def main(args):
    conf 
= JobConf(WordCountMap);
    conf.setJobName(
"wordcount");
 
    conf.setOutputKeyClass(Text);
    conf.setOutputValueClass(IntWritable);
    
    conf.setMapperClass(WordCountMap);        
    conf.setCombinerClass(Summer);
    conf.setReducerClass(Summer);
    
try:
        flags, other_args 
= getopt.getopt(args[1:], "m:r:")
    
except getopt.GetoptError:
        printUsage(
1)
    
if len(other_args) != 2:
        printUsage(
1)
    
    
for f,v in flags:
        
if f == "-m":
            conf.setNumMapTasks(int(v))
        
elif f == "-r":
            conf.setNumReduceTasks(int(v))
    conf.setInputPath(Path(other_args[0]))
    conf.setOutputPath(Path(other_args[
1]))
    JobClient.runJob(conf);

if __name__ == "__main__":
    main(sys.argv)





posted @ 2009-09-04 17:14 刘凯毅 阅读(1952) | 评论 (0)编辑 收藏

谢谢 同事 孙超 讲解
这就 把他的 思想 画个图


posted @ 2009-09-01 17:43 刘凯毅 阅读(1302) | 评论 (0)编辑 收藏



代码:
# -*- coding: UTF8 -*-

import sys
# 最小 支持度
sup_min = int(sys.argv[1])

ss 
= ","


# 交易 数据 库
D=[
 
'A,B,C,D',
 
'B,C,E',
 
'A,B,C,E',
 
'B,D,E',
 
'A,B,C,D'
]

print "交易数据库展现" 
for arr in D : print arr
print



'''
rows=int(sys.argv[1])
D=[]
for tid in open('BuyMusic.20090722.mob.prodIds').readlines()[:rows] :
    D.append(tid.split("\n")[0].split("\t")[1])


print "读取 文件结束 BuyMusic.20090722.mob.prodIds !"
'''
#全局 频繁项 收集
sup_data_map = {}
#全局  最大频繁项 收集
is_zsup={}

# 遍历过程 临时 局部  频繁项 收集
mapL = {}

# 第一次 频繁项 收集
def find_frequent_1_itemset(I):
    
if I=='null' or I=='' : return  
    
if mapL.has_key(I): mapL[I]+=1 
    
else: mapL[I]=1

map(find_frequent_1_itemset,[ I  
for TID in D for I  in TID.split(ss) ])

# 刷选掉 小于 最小支持度 的 频繁项
def remove_not_sup_min(map,supmin=sup_min):
    
for k  in [k for k,v in map.items() if v<supmin] :
        
del map[k]
remove_not_sup_min(mapL)

print "第一次 筛选 频繁项 结束!"
print mapL

# 装载 全局 频繁项 最大频繁项
for k,v in mapL.items() : 
    sup_data_map[k]
=v
    is_zsup[k]
=v

# 判定 是否 'BD' 属于  'BCD' 中 
isInTid = lambda I,TID : len(I.split(ss)) == len([i for i in I if i in TID.split(ss)])


# 组合  [A,B] + [A,C] = [A,B.C]
def comb(arr1,arr2):
    tmap
={}
    
for v in arr1+arr2 : tmap[v]="" 
    
return tmap.keys()

# apriori 迭代核心
def runL(mapL,dep):
    mapL2 
= {}
    C
={}
    keys 
= mapL.keys()
    iik
=""
    jjk
=""
    
# 根据 上次  频繁项 ,生成本次 '可能频繁项' 集合 
    for ii in range(len(keys)) : 
        
for jj in range(ii+1,len(keys)) :
            keystr
=comb([ch for ch in keys[ii].split(ss)],[ch for ch in keys[jj].split(ss)])
            
if not len(keystr) == dep : continue
            keystr.sort()
            tk
=ss.join(keystr)
            
if not tk in C : C[tk]=(keys[ii],keys[jj])

    
#  '可能频繁项' 对比 交易数据库  计数
    for tk,z in C.items():
        
for TID in D:
            
if isInTid(tk,TID) :
                
if mapL2.has_key(tk): mapL2[tk]+=1
                
else: mapL2[tk]=1

    
# 刷选掉 小于 最小支持度 的 频繁项
    remove_not_sup_min(mapL2)
    
for k,v in  is_zsup.items() :
        
for k1,v1 in mapL2.items() :
            
if isInTid(k,k1) :
                
del is_zsup[k]
                
break
    
# 全局 频繁项 ,最大频繁项  收集 
    for k,v in mapL2.items() : 
        sup_data_map[k]
=v
        is_zsup[k]
=v
    
print ""+str(dep)+"次 筛选 频繁项 结束!" 
    
return mapL2

# 真正 运行 
ii=1
while mapL :
    ii
=ii+1
    mapL 
= runL(mapL,ii)
    
print mapL

# 全局  频繁项 中 去除 最大频繁项
for k,v in is_zsup.items() :
    
if sup_data_map.has_key(k) : del sup_data_map[k]

print "频繁项"
print sup_data_map
print 
print "最大频繁项"
print is_zsup
print 

print "可信度 展现"
for k,v in  sup_data_map.items() :
    
for k1,v1 in is_zsup.items() :
        
if isInTid(k,k1) :
            
print k,"->",k1,"\t%.1f" %((float(is_zsup[k1])/float(sup_data_map[k]))*100)+"%"




结果:
-bash-3.00$ python ap.py 2
交易数据库展现
A,B,C,D
B,C,E
A,B,C,E
B,D,E
A,B,C,D

第一次 筛选 频繁项 结束!
{'A': 3, 'C': 4, 'B': 5, 'E': 3, 'D': 3}
第2次 筛选 频繁项 结束!
{'C,D': 2, 'C,E': 2, 'A,D': 2, 'A,B': 3, 'A,C': 3, 'B,E': 3, 'B,D': 3, 'B,C': 4}
第3次 筛选 频繁项 结束!
{'A,B,D': 2, 'A,B,C': 3, 'B,C,D': 2, 'B,C,E': 2, 'A,C,D': 2}
第4次 筛选 频繁项 结束!
{'A,B,C,D': 2}
第5次 筛选 频繁项 结束!
{}
频繁项
{'A': 3, 'C': 4, 'B': 5, 'E': 3, 'D': 3, 'C,D': 2, 'C,E': 2, 'A,D': 2, 'A,B': 3, 'A,C': 3, 'A,B,D': 2, 'B,C,D': 2, 'A,C,D': 2, 'B,E': 3, 'B,D': 3, 'B,C': 4, 'A,B,C': 3}

最大频繁项
{'B,C,E': 2, 'A,B,C,D': 2}

可信度 展现
A -> A,B,C,D     66.7%
C -> B,C,E     50.0%
C -> A,B,C,D     50.0%
B -> B,C,E     40.0%
B -> A,B,C,D     40.0%
E -> B,C,E     66.7%
D -> A,B,C,D     66.7%
C,D -> A,B,C,D     100.0%
C,E -> B,C,E     100.0%
A,D -> A,B,C,D     100.0%
A,B -> A,B,C,D     66.7%
A,C -> A,B,C,D     66.7%
A,B,D -> A,B,C,D     100.0%
B,C,D -> A,B,C,D     100.0%
A,C,D -> A,B,C,D     100.0%
B,E -> B,C,E     66.7%
B,D -> A,B,C,D     66.7%
B,C -> B,C,E     50.0%
B,C -> A,B,C,D     50.0%
A,B,C -> A,B,C,D     66.7%

posted @ 2009-08-31 14:25 刘凯毅 阅读(1788) | 评论 (0)编辑 收藏

一些特殊正则元字符说明:
 
1. *? 和 +? 和 {n,}?  懒惰匹配 
  
1.1 非懒惰                   ↓
    echo 
"ab2c121a" |perl -ne 'print $1 if /(.*)"d/;'   #print ab2c12
   
1.2 懒惰                       ↓
    echo 
"ab2c121a" |perl -ne 'print $1 if /(.*?)"d/;'   #print ab
 
2.  回溯引用和前后查找:
  
2.1 向前查找   (?=..)                   ↓
    echo 
"ab2c121a" |perl -ne 'print $1 if /(.*?)(?=2)/;'  #print ab
  
2.2 向后查找 (?<=..)                 ↓
    echo 
"ab2c121a" |perl -ne 'print $1 if /(?<=2)(.*)(?=2)/;' #print c1
  
2.3 负向-/后 查找  (?!..) (?<!..)   
   
#不能匹配 ..                               ↓
   
echo "ab2c121a" |perl -ne 'print $1 if /(?<!2)(c.*)/;'    #print 无
   
echo "ab2c121a" |perl -ne 'print $1 if /(?<!3)(c.*)/;'    #print c121a
  
2.4 条件 ?()  = if   ?()| = if else
   
# ?()  例如  <p> </p> 必须同时出现                ↓      ↓  
   
echo "<p>xx</p>"|perl -ne 'print $2  if /(<p>)?("w*)(?(1)<"/p>)/'   #print  xx
   
echo "<p>xx"|perl -ne 'print $2,""n" if /(<p>)?("w*)(?(1)<"/p>)/'    #print 空
   
echo "xx"|perl -ne 'print $2 if /(<p>)?("w*)(?(1)<"/p>)/'    #print xx
    # ?()|  例如 还是上面的, 
    # 当 有<p> 可以接</p> 也可以接 数字结尾                            ↓

echo 
"<p>xx1</p>"|perl -ne 'print $2  if /(<p>)?("w*)(?(1)<"/p>|"d)/'  #print xx1
echo 
"<p>xx1"|perl -ne 'print $2  if /(<p>)?("w*)(?(1)<"/p>|"d)/'    # print xx




posted @ 2009-08-27 16:04 刘凯毅 阅读(1278) | 评论 (0)编辑 收藏


当熟悉 hash db   python bsddb (db-key 转)
使用确实很方便,但是没有 想 关系数据库中的 select order by 查询 ,感觉比较郁闷! 上网 一顿 google ......

import bsddb
db 
= bsddb.btopen('/tmp/spam.db''c')
for i in range(10): db['%d'%i] = '%d'% (i*i)

db[
'3'# 9 
db.keys() # ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']

db.set_location(
'6'# 36 
db.previous() # 25 
db.next() # 36
db.next() # 47


这可以定位,并且 previous , next 什么的 (不过目前好像是针对 string 自然 排序!)
这里比较实用的 demo
import bsddb
db 
= bsddb.btopen('/tmp/spam2.db''c')
db[
"2009-08-14 22:00"]="gg"
db[
"2009-08-15 22:00"]="cc"
db[
"2009-07-15 00:00"]="tt"
db[
"2009-08-16 22:00"]="gg"

# 注意 这 统配 等价 正则 = 2009-08-15.*  
#
 开始 以为能使用 正则 ,但不能 。只能简单的 xxx.* 形式的
db.set_location('2009-08-15')   # ('2009-08-15 22:00', 'cc')
db.next() # ('2009-08-16 22:00', 'gg')

db.set_location(
'2009-08-15')   # ('2009-08-15 22:00', 'cc')
db.previous() #('2009-08-14 22:00', 'gg')









posted @ 2009-08-20 10:52 刘凯毅 阅读(1958) | 评论 (5)编辑 收藏


转:http://www.daniweb.com/forums/thread31449.html
什么都不说了,直接看代码吧。
注解 应该写的比较详细


# liukaiyi 
# 注 k-means ,维度类型 - 数值形式 ( 199 或 23.13 

import sys, math, random

# -- 类化 '数据' 
#
 在 n-维度空间
class Point:
    
def __init__(self, coords, reference=None):
        self.coords 
= coords
        self.n 
= len(coords)
        self.reference 
= reference
    
def __repr__(self):
        
return str(self.coords)

# -- 类化 '聚集点 / 聚类平均距离 点 ' 
#
 -- 在 n-维度空间
#
 -- k-means 核心类
#
 -- 每次 聚集各点 围绕她 进行聚集 
#
 -- 并提供方法 求-聚集后的计算中心点,同时记入 此次 中心点(聚集各点平均距离),为下一次聚集提供中心点.
class Cluster:
    
def __init__(self, points):
        
if len(points) == 0: raise Exception("ILLEGAL: EMPTY CLUSTER")
        self.points 
= points
        self.n 
= points[0].n
    
for p in points:
            
if p.n != self.n: raise Exception("ILLEGAL: MULTISPACE CLUSTER")
        
# 求 聚集各点后 平均点
    self.centroid = self.calculateCentroid()
    
def __repr__(self):
        
return str(self.points)
    
    
# 更新 中心点,并返回 原中心点 与 现中心点(聚集各点平均距离)距离  
    def update(self, points):
        old_centroid 
= self.centroid
        self.points 
= points
        self.centroid 
= self.calculateCentroid()
        
return getDistance(old_centroid, self.centroid)
    
    
# 计算平均点 (聚集/收集各点(离本类的中心点)最近数据,后生成新的 中心点 )
    def calculateCentroid(self):
        centroid_coords 
= []
        
#  维度 迭代
    for i in range(self.n):
            centroid_coords.append(
0.0)
            
# 收集各点 迭代 
        for p in self.points:
                centroid_coords[i] 
= centroid_coords[i]+p.coords[i]
            centroid_coords[i] 
= centroid_coords[i]/len(self.points)
        
return Point(centroid_coords)

# -- 返回根据 k-means 聚集形成的 数据集 
def kmeans(points, k, cutoff):
    
# Randomly sample k Points from the points list, build Clusters around them
    initial = random.sample(points, k)
    clusters 
= []
    
for p in initial: clusters.append(Cluster([p]))
    
# 迭代 k-means 直到 每次迭代 各收集点 别的 最多 不超过 0.5 
    while True:
        
#  k 个收集 数组
        lists = []
        
for c in clusters: lists.append([])
    
# 迭代 每个 数据点 ,并计算与每个中心点距离
    # 并把数据点添加入相应最短的中心点收集数组中
    # 在迭代中 smallest_distance 为每个点与各中心点最短距离 参数,请注意看
        for p in points:
            smallest_distance 
= getDistance(p, clusters[0].centroid)
            index 
= 0
            
for i in range(len(clusters[1:])):
                distance 
= getDistance(p, clusters[i+1].centroid)
                
if distance < smallest_distance:
                    smallest_distance 
= distance
                    index 
= i+1
            
# 添加到 离最短中心距离的 数组中
        lists[index].append(p)
    
        
# 聚集完,计算新 中心点
    # 并 cluster.centroid 属性记入下 新中心点(下一次 聚集的中心点 )
    # 并 计算与上一次 中心点 距离 ,如果 差值在 cutoff 0.5 以下 ,跳出迭代 (结束,返回最后一次 聚集集合)
    biggest_shift = 0.0
        
for i in range(len(clusters)):
            shift 
= clusters[i].update(lists[i])
            biggest_shift 
= max(biggest_shift, shift)
        
if biggest_shift < cutoff: break
    
return clusters


# -- 得到欧几里德距离两点之间 
def getDistance(a, b):
    
# Forbid measurements between Points in different spaces
    if a.n != b.n: raise Exception("ILLEGAL: NON-COMPARABLE POINTS")
    
# Euclidean distance between a and b is sqrt(sum((a[i]-b[i])^2) for all i)
    ret = 0.0
    
for i in range(a.n):
        ret 
= ret+pow((a.coords[i]-b.coords[i]), 2)
    
return math.sqrt(ret)

# -- 在 n-维度 空间中创建 随机点
#
 -- 随机生成 测试数据
def makeRandomPoint(n, lower, upper):
    coords 
= []
    
for i in range(n): coords.append(random.uniform(lower, upper))
    
return Point(coords)

# main 
def main(args):
    
# 参数说明
    # num_points,    n,    k,      cutoff,         lower,        upper 
    # 随机数据数量 , 维度, 聚集数, 跳出迭代最小距离 ,   维度数最大值,维度数最小值
    num_points, n, k, cutoff, lower, upper = 10230.5-200200

    
# 在 n-维度空间里 , 创建 num_points 随机点
    # 测试数据生成 
    points = []
    
for i in range(num_points): points.append(makeRandomPoint(n, lower, upper))

    
# 使用 k-means 算法,来 聚集数据点 (算法入口点)
    clusters = kmeans(points, k, cutoff)

    
print "\nPOINTS:"
    
for p in points: print "P:", p
    
print "\nCLUSTERS:"
    
for c in clusters: print "C:", c
if __name__ == "__main__": main(sys.argv)

posted @ 2009-08-07 16:20 刘凯毅 阅读(2034) | 评论 (0)编辑 收藏



1. vi /etc/vsftpd/vsftpd.conf
   添加:
listen=YES
tcp_wrappers=YES
port_enable=YES
ftp_data_port=20
listen_port=21
listen_address=0.0.0.0
port_promiscuous=NO
no_anon_password=NO
anon_mkdir_write_enable=no

2.将chroot_list_enable=YES前的#去掉
  并将chroot_list_file=/etc/vsftpd.chroot_list 前的#去掉

3.创建用户
   useradd 用户
   passwd 用户

4. vi /etc/vsftpd.chroot_list
   将 用户 添加到文件里

5.修改用户的登录路径(主目录)
  vi /etc/passwd
    如:data:x:516:516::/home/data/data:/sbin/nologin

6.启动vsftp
  service vsftpd restart

posted @ 2009-07-27 15:46 刘凯毅 阅读(327) | 评论 (0)编辑 收藏


Java 代码:
package com.xunjie.dmsp.olduser;

import java.util.Properties;

import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.operation.regex.RegexSplitter;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.scheme.TextLine;
import cascading.tap.Hfs;
import cascading.tap.Tap;
import cascading.tuple.Fields;

/**
 * test.txt: 
 * 1    a
 * 2    b
 * 3    c
 * 
 * /data/hadoop/hadoop/bin/hadoop jar 
 *         dmsp_test_jar-1.0-SNAPSHOT-dependencies.jar 
 *             hdfs:/user/hadoop/test/lky/test.txt
 *             file:///data/hadoop/test/lky/output
 
*/
public class Test2 {
    
public static void main(String[] args) {
        
        
//设定输入文件
        String sourcePath= args[0];
        
//设置输出文件夹
        String sinkPath = args[1];

        
//定义读取列
        Fields inputfields = new Fields("num""value");
        
//定义分解正则,默认 \t
        RegexSplitter spliter = new RegexSplitter(inputfields);
        
        
        
//管道定义
        Pipe p1 = new Pipe( "test" );
        
//管道嵌套:
        
//分解日志源文件,输出给定字段
        p1 = new Each(p1,new Fields("line") ,spliter);
        
        
        
//设定输入和输出 ,使用 泛型Hfs
        Tap source = new Hfs( new TextLine(),  sourcePath );
        Tap sink 
= new Hfs( new TextLine() , sinkPath );
        
        
        
        
//配置job
        Properties properties = new Properties();
        properties.setProperty(
"hadoop.job.ugi""hadoop,hadoop");
        
        FlowConnector.setApplicationJarClass( properties, Main.
class );
        FlowConnector flowConnector 
= new FlowConnector(properties);
        
        Flow importFlow 
= flowConnector.connect( "import flow", source,sink,p1);
        
        importFlow.start();
        importFlow.complete();
        

    }
}


posted @ 2009-07-22 10:01 刘凯毅 阅读(645) | 评论 (0)编辑 收藏




这特殊关注下,开启慢查询。在web开发中很有帮助

MYSQL启用日志,和查看日志

时间:2009-01-21 17:33:57  来源:http://wasabi.javaeye.com/blog/318962  作者:kenbli
mysql有以下几种日志:  
   错误日志:     -log-err  
   查询日志:     -log  
   慢查询日志:   -log-slow-queries  
   更新日志:     -log-update  
   二进制日志: -log-bin  


是否启用了日志 
mysql>show variables like 'log_%'; 

怎样知道当前的日志 
mysql> show master status; 

顯示二進制日志數目 
mysql> show master logs; 

看二进制日志文件用mysqlbinlog 
shell>mysqlbinlog mail-bin.000001 
或者shell>mysqlbinlog mail-bin.000001 | tail 

在配置文件中指定log的輸出位置. 
Windows:Windows 的配置文件为 my.ini,一般在 MySQL 的安装目录下或者 c:\Windows 下。 
Linux:Linux 的配置文件为 my.cnf ,一般在 /etc 下。 

在linux下: 
Sql代码
  1. # 在[mysqld] 中輸入   
  2. #log   
  3. log-error=/usr/local/mysql/log/error.log   
  4. log=/usr/local/mysql/log/mysql.log   
  5. long_query_time=2   
  6. log-slow-queries= /usr/local/mysql/log/slowquery.log  


windows下: 
Sql代码
  1. # 在[mysqld] 中輸入   
  2. #log   
  3. log-error="E:/PROGRA~1/EASYPH~1.0B1/mysql/logs/error.log"  
  4. log="E:/PROGRA~1/EASYPH~1.0B1/mysql/logs/mysql.log"  
  5. long_query_time=2   
  6. log-slow-queries= "E:/PROGRA~1/EASYPH~1.0B1/mysql/logs/slowquery.log"  


开启慢查询 
long_query_time =2  --是指执行超过多久的sql会被log下来,这里是2秒 
log-slow-queries= /usr/local/mysql/log/slowquery.log  --将查询返回较慢的语句进行记录 

log-queries-not-using-indexes = nouseindex.log  --就是字面意思,log下来没有使用索引的query 

log=mylog.log  --对所有执行语句进行记录
posted @ 2009-07-19 10:50 刘凯毅 阅读(593) | 评论 (0)编辑 收藏

仅列出标题
共12页: 上一页 1 2 3 4 5 6 7 8 9 下一页 Last