随笔-299  评论-208  文章-0  trackbacks-0
  2017年8月14日
1. 日期输出格式化

所有日期、时间的api都在datetime模块内。

1. datetime => string

now = datetime.datetime.now()
now.strftime('%Y-%m-%d %H:%M:%S')
#输出2012-03-05 16:26:23.870105

strftime是datetime类的实例方法。

2. string => datetime

t_str = '2012-03-05 16:26:23'
d = datetime.datetime.strptime(t_str, '%Y-%m-%d %H:%M:%S')

strptime是datetime类的静态方法。

2. 日期比较操作

在datetime模块中有timedelta类,这个类的对象用于表示一个时间间隔,比如两个日期或者时间的差别。

构造方法:

datetime.timedelta(days=0, seconds=0, microseconds=0, milliseconds=0, minutes=0, hours=0, weeks=0)

所有的参数都有默认值0,这些参数可以是int或float,正的或负的。

可以通过timedelta.days、tiemdelta.seconds等获取相应的时间值。

timedelta类的实例,支持加、减、乘、除等操作,所得的结果也是timedelta类的实例。比如:

year = timedelta(days=365)
ten_years = year *10
nine_years = ten_years - year

同时,date、time和datetime类也支持与timedelta的加、减运算。

datetime1 = datetime2 +/- timedelta
timedelta = datetime1 - datetime2

这样,可以很方便的实现一些功能。

1. 两个日期相差多少天。

d1 = datetime.datetime.strptime('2012-03-05 17:41:20', '%Y-%m-%d %H:%M:%S')
d2 = datetime.datetime.strptime('2012-03-02 17:41:20', '%Y-%m-%d %H:%M:%S')
delta = d1 - d2
print delta.days

输出:3

2. 今天的n天后的日期。

now = datetime.datetime.now()
delta = datetime.timedelta(days=3)
n_days = now + delta
print n_days.strftime('%Y-%m-%d %H:%M:%S')

输出:2012-03-08 17:44:50

复制代码
#coding=utf-8
import datetime
now=datetime.datetime.now()
print now
#将日期转化为字符串 datetime => string
print now.strftime('%Y-%m-%d %H:%M:%S')

t_str = '2012-03-05 16:26:23'
#将字符串转换为日期 string => datetime
d=datetime.datetime.strptime(t_str,'%Y-%m-%d %H:%M:%S')
print d

#在datetime模块中有timedelta类,这个类的对象用于表示一个时间间隔,比如两个日#期或者时间的差别。

#计算两个日期的间隔
d1 = datetime.datetime.strptime('2012-03-05 17:41:20', '%Y-%m-%d %H:%M:%S')
d2 = datetime.datetime.strptime('2012-03-02 17:41:20', '%Y-%m-%d %H:%M:%S')
delta = d1 - d2
print delta.days
print delta

#今天的n天后的日期。
now=datetime.datetime.now()
delta=datetime.timedelta(days=3)
n_days=now+delta
print n_days.strftime('%Y-%m-%d %H:%M:%S')
复制代码
posted @ 2017-08-14 23:09 xzc 阅读(10) | 评论 (0)编辑 收藏
  2017年8月2日

Shell中并没有真正意义的多线程,要实现多线程可以启动多个后端进程,最大程度利用cpu性能。

直接看代码示例吧。

(1) 顺序执行的代码

复制代码
 1 #!/bin/bash  2 date  3 for i in `seq 1 5`  4 do  5 {  6     echo "sleep 5"  7     sleep 5  8 }  9 done 10 date 
复制代码

输出:

复制代码
Sat Nov 19 09:21:51 CST 2016 sleep 5 sleep 5 sleep 5 sleep 5 sleep 5 Sat Nov 19 09:22:16 CST 2016
复制代码

(2) 并行代码

使用'&'+wait 实现“多进程”实现

复制代码
 1 #!/bin/bash  2 date  3 for i in `seq 1 5`  4 do  5 {  6     echo "sleep 5"  7     sleep 5  8 } &  9 done 10 wait  ##等待所有子后台进程结束 11 date
复制代码

输出:

复制代码
Sat Nov 19 09:25:07 CST 2016 sleep 5 sleep 5 sleep 5 sleep 5 sleep 5 Sat Nov 19 09:25:12 CST 2016
复制代码

 (3) 对于大量处理任务如何实现启动后台进程的数量可控?

  简单的方法可以使用2层for/while循环实现,每次wait内层循环的多个后台程序执行完成

  但是这种方式的问题是,如果内层循环有“慢节点”可能导致整个任务的执行执行时间长。

  更高级的实现可以看(4)

(4) 使用命名管道(fifo)实现每次启动后台进程数量可控。 

复制代码
 1 #!/bin/bash  2   3 function my_cmd(){  4     t=$RANDOM  5     t=$[t%15]  6     sleep $t  7     echo "sleep $t s"  8 }  9  10 tmp_fifofile="/tmp/$$.fifo"  11 mkfifo $tmp_fifofile      # 新建一个fifo类型的文件 12 exec 6<>$tmp_fifofile     # 将fd6指向fifo类型 13 rm $tmp_fifofile    #删也可以 14  15 thread_num=5  # 最大可同时执行线程数量 16 job_num=100   # 任务总数 17  18 #根据线程总数量设置令牌个数 19 for ((i=0;i<${thread_num};i++));do 20     echo 21 done >&6  22  23 for ((i=0;i<${job_num};i++));do # 任务数量 24     # 一个read -u6命令执行一次,就从fd6中减去一个回车符,然后向下执行, 25     # fd6中没有回车符的时候,就停在这了,从而实现了线程数量控制 26     read -u6  27  28     #可以把具体的需要执行的命令封装成一个函数 29     {    30         my_cmd 31     } & 32  33     echo >&6 # 当进程结束以后,再向fd6中加上一个回车符,即补上了read -u6减去的那个 34 done 35  36 wait 37 exec 6>&- # 关闭fd6 38 echo "over"
复制代码

 

参考:http://lawrence-zxc.github.io/2012/06/16/shell-thread/

posted @ 2017-08-02 17:01 xzc 阅读(18) | 评论 (0)编辑 收藏
  2017年7月28日

之前在论坛看到一个关于HDFS权限的问题,当时无法回答该问题。无法回答并不意味着对HDFS权限一无所知,而是不能准确完整的阐述HDFS权限,因此决定系统地学习HDFS文件权限。HDFS的文件和目录权限模型共享了POSIX(Portable Operating System Interface,可移植操作系统接口)模型的很多部分,比如每个文件和目录与一个拥有者和组相关联,文件或者目录对于拥有者、组内的其它用户和组外的其它用户有不同的权限等。与POSIX模型不同的是,HDFS中的文件没有可执行文件的概念,因而也没有setuid和setgid,虽然目录依然保留着可执行目录的概念(x),但对于目录也没有setuid和setgid。粘贴位(sticky bit)可以用在目录上,用于阻止除超级用户,目录或文件的拥有者外的任何删除或移动目录中的文件,文件上的粘贴位不起作用。

      当创建文件或目录时,拥有者为运行客户端进程的用户,组为父目录所属的组。每个访问HDFS的客户端进程有一个由用户姓名和组列表两部分组的成标识,无论何时HDFS必须对由客户端进程访问的文件或目录进行权限检查,规则如下:

 

  • 如果进程的用户名匹配文件或目录的拥有者,那么测试拥有者权限
  • 否则如果文件或目录所属的组匹配组列表中任何组,那么测试组权限
  • 否则测试其它权限

 

      如果权限检查失败,则客户端操作失败。

      从hadoop-0.22开始,hadoop支持两种不同的操作模式以确定用户,分别为simple和kerberos具体使用哪个方式由参数hadoop.security.authentication设置,该参数位于core-site.xml文件中,默认值为simple。在simple模式下,客户端进程的身份由主机的操作系统确定,比如在类Unix系统中,用户名为命令whoami的输出。在kerberos模式下,客户端进程的身份由Kerberos凭证确定,比如在一个Kerberized环境中,用户可能使用kinit工具得到了一个Kerberos ticket-granting-ticket(TGT)且使用klist确定当前的principal。当映射一个Kerberosprincipal到HDFS的用户名时,除了最主要的部分外其余部分都被丢弃,比如一个principal为todd/foobar@CORP.COMPANY.COM,将映射为HDFS上的todd。无论哪种操作模式,对于HDFS来说用户标识机制都是外部的,HDFS本身没有创建用户标,建立组或者处理用户凭证的规定。

      上面讨论了确定用户的两种模式,即simple和kerberos,下面学习如何确定用户组。用户组是通过由参数hadoop.security.group.mapping设置的组映射服务确定的,默认实现是org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback,该实现首先确定Java本地接口(JNI)是否可用,如果JNI可用,该实现将使用hadoop中的API为用户解析用户组列表。如果JNI不可用,那么使用ShellBasedUnixGroupsMapping,该实现将使用Linux/Unix中的bash –cgroups命令为用户解析用户组列表。其它实现还有LdapGroupsMapping,通过直接连接LDAP服务器来解析用户组列表。对HDFS来说,用户到组的映射是在NameNode上执行的,因而NameNode的主机系统配置决定了用户的组映射。HDFS将文件或目录的用户和组存储为字符串,并且不像Linux/Unix那样可以将用户和组转换为数字。

      每个针对文件或者目录的操作都将全路径名称传递到NameNode,然后对该路径的每次操作都将应用权限检查。客户端隐含地关联用户身份到NameNode的连接,减少改变现存客户端API的需要。总是存在这么一种情景,当在一个文件上的操作成功后,当重复该操作时可能失败,因为该文件或者路径中的某些目录已经不再存在。例如,当客户端第一次开始读取一个文件时,它向NameNode发出的第一个请求来发现该文件第一个块的位置,第二个寻找其他块的请求可能失败。另一方面,对于已经知道文件块的客户端来说,删除文件不会取消访问。通过添加权限,客户端对文件的访问在请求之间可能撤回,对于已经知道文件块的客户端来说,改变权限不会取消客户端的访问。

      HDFS中超级用户与通常熟悉的Linux或Unix中的root用户不同,HDFS的超级用户是与NameNode进程有相同标示的用户,更简单易懂些,启动NameNode的用户就为超级用户。对于谁是超级用户没有固定的定义,当NameNode启动后,该进程的标示决定了谁是超级用户。HDFS的超级用户不必是NameNode主机的超级用户,也需用所有的集群使用相同的超级用户,出于实验目的在个人工作站上运行HDFS的人自然而然的称为超级用户而不需要任何配置。另外参数dfs.permissions.superusergroup设置了超级用户,该组中的所有用户也为超级用户。超级用户在HDFS中可以执行任何操作而针对超级用户的权限检查永远不会失败。

      HDFS也提供了对POSIX ACL(访问控制列表)支持来为特定的用户或者用户组提供更加细粒度的文件权限。ACL是不同于用户和组的自然组织层次的有用的权限控制方式,ACL可以为特定的用户和组设置不同的权限,而不仅仅是文件的拥有者和文件所属的组。默认情况下,HDFS禁用ACL,因此NameNode禁止ACL的创建,为了启用ACL,需要在hdfs-site.xml中将参数dfs.namenode.acls.enabled设置为true。

      访问控制列表由一组ACL项组成,每个ACL项命名了特定的用户或组,并为其授予或拒绝读,写和执行的权限,例如:

 

user::rw- user:bruce:rwx                  #effective:r-- group::r-x                      #effective:r-- group:sales:rwx                 #effective:r-- mask::r-- other::r-- 

 

      每个ACL项由类型,可选的名称和权限字符串组成,它们之间使用冒号(:)。在上面的例子中文件的拥有者具有读写权限,文件所属的组具有读和执行的权限,其他用户具有读权限,这些设置与将文件设置为654等价(6表示拥有者的读写权限,5表示组的读和执行权限,4表示其他用户的读权限)。除此之外,还有两个扩展的ACL项,分别为用户bruce和组sales,并都授予了读写和执行的权限。mask项是一个特殊的项,用于过滤授予所有命名用户,命名组及未命名组的权限,即过滤除文件拥有者和其他用户(other)之外的任何ACL项。在该例子中,mask值有读权限,则bruce用户、sales组和文件所属的组只具有读权限。每个ACL必须有mask项,如果用户在设置ACL时没有使用mask项,一个mask项被自动加入到ACL中,该mask项是通过计算所有被mask过滤项的权限与(&运算)得出的。对拥有ACL的文件执行chmod实际改变的是mask项的权限,因为mask项扮演的是过滤器的角色,这将有效地约束所有扩展项的权限,而不是仅改变组的权限而可能漏掉其它扩展项的权限。

      访问控制列表和默认访问控制列表存在着不同,前者定义了在执行权限检查实施的规则,后者定义了新文件或者子目录创建时自动接收的ACL项,例如:

user::rwx group::r-x other::r-x default:user::rwx default:user:bruce:rwx          #effective:r-x default:group::r-x default:group:sales:rwx         #effective:r-x default:mask::r-x default:other::r-x 

      只有目录可能拥有默认访问控制列表,当创建新文件或者子目录时,自动拷贝父辈的默认访问控制列表到自己的访问控制列表中,新的子目录也拷贝父辈默认的访问控制列表到自己的默认访问控制列表中。这样,当创建子目录时默认ACL将沿着文件系统树被任意深层次地拷贝。在新的子ACL中,准确的权限由模式参数过滤。默认的umask为022,通常新目录权限为755,新文件权限为644。模式参数为未命名用户(文件的拥有者),mask及其他用户过滤拷贝的权限值。在上面的例子中,创建权限为755的子目录时,模式对最终结果没有影响,但是如果创建权限为644的文件时,模式过滤器导致新文件的ACL中文件拥有者的权限为读写,mask的权限为读以及其他用户权限为读。mask的权限意味着用户bruce和组sales只有读权限。拷贝ACL发生在文件或子目录的创建时,后面如果修改父辈的默认ACL将不再影响已存在子类的ACL。

      默认ACL必须包含所有最小要求的ACL项,包括文件拥有者项,文件所属的组项和其它用户项。如果用户没有在默认ACL中配置上述三项中的任何一个,那么该项将通过从访问ACL拷贝对应的权限来自动插入,或者如果没有访问ACL则自动插入权限位。默认ACL也必须拥有mask,如果mask没有被指定,通过计算所有被mask过滤项的权限与(&运算)自动插入mask。当一个文件拥有ACL时,权限检查的算法变为:

 

  • 如果用户名匹配文件的拥有者,则测试拥有者权限
  • 否则,如果用户名匹配命名用户项中的用户名,则测试由mask权限过滤后的该项的权限
  • 否则,如果文件所属的组匹配组列表中的任何组,并且如果这些被mask过滤的权限具有访问权限,那么使用这么权限
  • 否则,如果存在命名组项匹配组列表中的成员,并且如果这些被mask过滤的权限具有访问权限,那么使用这么权限
  • 否则,如果文件所属的组或者任何命名组项匹配组列表中的成员,但不具备访问权限,那么访问被拒绝
  • 否则测试文件的其他用户权限

 

      最佳实践时基于传统的权限位设置大部分权限要求,然后定义少量带有特殊规则的ACL增加权限位。相比较只是用权限位的文件,使用ACL的文件会在NameNode中产生额外的内存消耗。

      上面学习了HDFS中的文件权限和访问控制列表,最后学习一下如何针对权限和ACL进行配置,下表列出了其中的重要参数:

参数名

位置

用途

dfs.permissions.enabled

hdfs-site.xml

默认值为true,即启用权限检查。如果为 false,则禁用权限检查。

hadoop.http.staticuser.user

core-site.xml

默认值为dr.who,查看web UI的用户

dfs.permissions.superusergroup

hdfs-site.xml

超级用户的组名称,默认为supergroup

<fs.permissions.umask-mode

core-site.xml

创建文件和目录时使用的umask,默认值为八进制022,每位数字对应了拥有者,组和其他用户。该值既可以使用八进制数字,如022,也可以使用符号,如u=rwx,g=r-x,o=r-x(对应022)

dfs.cluster.administrators

hdfs-site.xml

被指定为ACL的集群管理员

dfs.namenode.acls.enabled

hdfs-site.xml

默认值为false,禁用ACL,设置为true则启用ACL。当ACL被禁用时,NameNode拒绝设置或者获取ACL的请求

posted @ 2017-07-28 10:55 xzc 阅读(23) | 评论 (0)编辑 收藏
  2017年7月27日
1. crontab 命令:用于在某个时间,系统自动执行你所希望的程序文件或命令。
2. crontab 的参数
        -e      (edit user's crontab)
        -l      (list user's crontab)
        -r      (delete user's crontab)
        -i      (prompt before deleting user's crontab)
3.下面进行一个例子:在8月6号18时每隔3分钟执行以下命令:who >> /apple/test_crontab.log
   步骤一:先创建一个文件cronfile:内容为如下:
           */3 18 6 8 * who >> /apple/test_crontab_log
   步骤二:将文件cronfile 加入到cron守护进行(命令为:crontab cronfile)
4. 检查是否加入到守护进程cron中,用命令:crontab -l
   如何出来的内容中包含你刚刚的内容,则加入成功。每隔3分钟查看下test_crontab.log文件,看看是否有内容。
5. 对crontab内容格式的解释:f1 f2 f3 f4 f5 program
   f1 是表示分钟(0-59),f2 表示小时(0-23),f3 表示一个月份中的第几日(1-(31、30、29、28)),f4 表示月份(1-12),f5 表示一个星期中的第几天(0-6(0表示周日))。program 表示要执行的程式(可以理解为文件或命令)
   f1:为*时候表示每隔1分钟,如果为*/n 表示每隔n分钟,如果为3,4 表示第3,4分钟,如果为2-6表示第2分钟到第6分钟。
   f2:为*时候表示每隔1小说。如果为*/n 表示每隔n小时,如果为3,4 表示第3,4小时,如果为2-6表示第2小时到第6小时
   f3: 为*时候表示每天。n 表示第n天
   f4: 为*时候表示每月。n 表示第n个月
   f5: 为*时候表示每周。0表示周日,6表示周六,1-4表示周一到周六
6. 具体例子:(来自crontab百度百科)
   a. 每月每天每小时的第 0 分钟执行一次 /bin/ls :   0 * * * * /bin/ls   
   b. 在 12 月内, 每天的早上 6 点到 12 点中,每隔 20 分钟执行一次 /usr/bin/backup :
      */20 6-12 * 12 * /usr/bin/backup  
  c. 周一到周五每天下午 5:00 寄一封信给 alex_mail_name :  
      0 17 * * 1-5 mail -s "hi" alex_mail_name < /tmp/maildata   
   d. 每月每天的午夜 0 点 20 分, 2 点 20 分, 4 点 20 分....执行 echo "haha"   
      20 0-23/2 * * * echo "haha"   
   e. 晚上11点到早上8点之间每两个小时和早上8点 显示日期  0 23-7/2,8 * * * date 
posted @ 2017-07-27 18:59 xzc 阅读(23) | 评论 (0)编辑 收藏
  2017年7月6日
最近一段时间,在处理Shell 脚本时候,遇到时间的处理问题。 时间的加减,以及时间差的计算。 
 
1。 时间加减
 
这里处理方法,是将基础的时间转变为时间戳,然后,需要增加或者改变时间,变成 秒。 
 
如:1990-01-01 01:01:01  加上 1小时 20分
处理方法:
a.将基础时间转为时间戳
time1=$(date +%s -d '1990-01-01 01:01:01')
echo $time1
631126861 【时间戳】
 
b.将增加时间变成秒
[root@localhost ~]# time2=$((1*60*60+20*60))
[root@localhost ~]# echo $time2
4800
 
c.两个时间相加,计算出结果时间
time1=$(($time1+$time2))
time1=$(date +%Y-%m-%d\ %H:%M:%S -d "1970-01-01 UTC $time1 seconds");
echo $time1
1990-01-01 02:21:01
 
2。时间差计算方法
 
如:2010-01-01 与 2009-01-01 11:11:11 时间差
原理:同样转成时间戳,然后计算天,时,分,秒
 
time1=$(($(date +%s -d '2010-01-01') - $(date +%s -d '2009-01-01 11:11:11')));
echo time1
 
将time1 / 60 秒,就变成分了。
 
补充说明:
shell 单括号运算符号:
a=$(date);
等同于:a=`date`;
 
双括号运算符:
a=$((1+2));
echo $a;
等同于:
a=`expr 1 + 2`
posted @ 2017-07-06 16:33 xzc 阅读(42) | 评论 (1)编辑 收藏
  2017年5月18日

可参照:http://www.voidcn.com/blog/Vindra/article/p-4917667.html

一、get请求 

curl "http://www.baidu.com"  如果这里的URL指向的是一个文件或者一幅图都可以直接下载到本地

curl -i "http://www.baidu.com"  显示全部信息

curl -l "http://www.baidu.com" 只显示头部信息

curl -v "http://www.baidu.com" 显示get请求全过程解析

 

wget "http://www.baidu.com"也可以

 

二、post请求

curl -d "param1=value1&param2=value2" "http://www.baidu.com"

 

三、json格式的post请求

curl -l -H "Content-type: application/json" -X POST -d '{"phone":"13521389587","password":"test"}' http://domain/apis/users.json

例如:

curl -l -H "Content-type: application/json" -X POST -d '{"ver": "1.0","soa":{"req":"123"},"iface":"me.ele.lpdinfra.prediction.service.PredictionService","method":"restaurant_make_order_time","args":{"arg2":"\"stable\"","arg1":"{\"code\":[\"WIND\"],\"temperature\":11.11}","arg0":"{\"tracking_id\":\"100000000331770936\",\"eleme_order_id\":\"100000000331770936\",\"platform_id\":\"4\",\"restaurant_id\":\"482571\",\"dish_num\":1,\"dish_info\":[{\"entity_id\":142547763,\"quantity\":1,\"category_id\":1,\"dish_name\":\"[0xe7][0x89][0xb9][0xe4][0xbb][0xb7][0xe8][0x85][0x8a][0xe5][0x91][0xb3][0xe5][0x8f][0x89][0xe7][0x83][0xa7][0xe5][0x8f][0x8c][0xe6][0x8b][0xbc][0xe7][0x85][0xb2][0xe4][0xbb][0x94][0xe9][0xa5][0xad]\",\"price\":31.0}],\"merchant_location\":{\"longitude\":\"121.47831425\",\"latitude\":\"31.27576153\"},\"customer_location\":{\"longitude\":\"121.47831425\",\"latitude\":\"31.27576153\"},\"created_at\":1477896550,\"confirmed_at\":1477896550,\"dishes_total_price\":0.0,\"food_boxes_total_price\":2.0,\"delivery_total_price\":2.0,\"pay_amount\":35.0,\"city_id\":\"1\"}"}}' http://vpcb-lpdinfra-stream-1.vm.elenet.me:8989/rpc

ps:json串内层参数需要格式化

posted @ 2017-05-18 11:28 xzc 阅读(78) | 评论 (1)编辑 收藏
  2017年5月17日
服务器上的一些统计数据:

1)统计80端口连接数
netstat -nat|grep -i "80"|wc -l

2)统计httpd协议连接数
ps -ef|grep httpd|wc -l

3)、统计已连接上的,状态为“established
netstat -na|grep ESTABLISHED|wc -l

4)、查出哪个IP地址连接最多,将其封了.
netstat -na|grep ESTABLISHED|awk {print $5}|awk -F: {print $1}|sort|uniq -c|sort -r +0n

netstat -na|grep SYN|awk {print $5}|awk -F: {print $1}|sort|uniq -c|sort -r +0n

---------------------------------------------------------------------------------------------

1、查看apache当前并发访问数:
netstat -an | grep ESTABLISHED | wc -l

对比httpd.conf中MaxClients的数字差距多少。

2、查看有多少个进程数:
ps aux|grep httpd|wc -l

3、可以使用如下参数查看数据
server-status?auto

#ps -ef|grep httpd|wc -l
1388
统计httpd进程数,连个请求会启动一个进程,使用于Apache服务器。
表示Apache能够处理1388个并发请求,这个值Apache可根据负载情况自动调整。

#netstat -nat|grep -i "80"|wc -l
4341
netstat -an会打印系统当前网络链接状态,而grep -i "80"是用来提取与80端口有关的连接的,wc -l进行连接数统计。
最终返回的数字就是当前所有80端口的请求总数。

#netstat -na|grep ESTABLISHED|wc -l
376
netstat -an会打印系统当前网络链接状态,而grep ESTABLISHED 提取出已建立连接的信息。 然后wc -l统计。
最终返回的数字就是当前所有80端口的已建立连接的总数。

netstat -nat||grep ESTABLISHED|wc - 可查看所有建立连接的详细记录

查看Apache的并发请求数及其TCP连接状态:
Linux命令:
netstat -n | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}'

返回结果示例:
LAST_ACK 5
SYN_RECV 30
ESTABLISHED 1597
FIN_WAIT1 51
FIN_WAIT2 504
TIME_WAIT 1057
其中的
SYN_RECV表示正在等待处理的请求数;
ESTABLISHED表示正常数据传输状态;
TIME_WAIT表示处理完毕,等待超时结束的请求数。

---------------------------------------------------------------------------------------------

查看httpd进程数(即prefork模式下Apache能够处理的并发请求数):
Linux命令:
     ps -ef | grep httpd | wc -l

查看Apache的并发请求数及其TCP连接状态:

Linux命令:
     netstat -n | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}'
返回结果示例:
LAST_ACK 5
SYN_RECV 30
ESTABLISHED 1597
FIN_WAIT1 51
FIN_WAIT2 504
TIME_WAIT 1057

说明:
   SYN_RECV表示正在等待处理的请求数;
   ESTABLISHED表示正常数据传输状态;
   TIME_WAIT表示处理完毕,等待超时结束的请求数。
posted @ 2017-05-17 23:12 xzc 阅读(79) | 评论 (2)编辑 收藏
  2017年5月12日

一、回收站简介:

    在HDFS里,删除文件时,不会真正的删除,其实是放入回收站/trash,回收站里的文件可以快速恢复。

    可以设置一个时间阀值,当回收站里文件的存放时间超过这个阀值或是回收站被清空时,文件才会被彻底删除,并且释放占用的数据块。

二、实例:

    Hadoop的回收站trash功能默认是关闭的,所以需要在core-site.xml中手动开启。

1、修改core-site.xml,增加:

复制代码
<property>  <name>fs.trash.interval</name>  <value>1440</value>  <description>Number of minutes between trash checkpoints.  If zero, the trash feature is disabled.  </description>  </property>
复制代码

默认是0,单位是分钟,这里设置为1天。
删除数据rm后,会将数据move到当前文件夹下的.Trash目录。

2、测试

1)、新建目录input

hadoop/bin/hadoop fs -mkdir input

2)、上传文件

root@master:/data/soft# hadoop/bin/hadoop fs -copyFromLocal /data/soft/file0* input

3)、删除目录input

[root@master data]# hadoop fs -rmr input  Moved to trash: hdfs://master:9000/user/root/input

4)、查看当前目录

[root@master data]# hadoop fs -ls  Found 2 items  drwxr-xr-x - root supergroup 0 2011-02-12 22:17 /user/root/.Trash

发现input删除了,多了一个目录.Trash
5)、恢复刚刚删除的目录

[root@master data]# hadoop fs -mv /user/root/.Trash/Current/user/root/input /user/root/input

6)、查看恢复的数据

[root@master data]# hadoop fs -ls input  Found 2 items  -rw-r--r-- 3 root supergroup 22 2011-02-12 17:40 /user/root/input/file01  -rw-r--r-- 3 root supergroup 28 2011-02-12 17:40 /user/root/input/file02

7)、删除.Trash目录(清理垃圾)

[root@master data]# hadoop fs -rmr .Trash  Deleted hdfs://master:9000/user/root/.Trash
posted @ 2017-05-12 11:20 xzc 阅读(58) | 评论 (0)编辑 收藏
  2017年5月10日
     摘要:  以前用redis用的很多,各种数据类型用的飞起,算是用得很溜了。不过那都是封装好的方法,自己直接调用。以前的公司比较规范,开发只是开发,很少去做跟运维相关的事情。             换了一份工作,不过这边项目刚开始起步,各种东西还不是很全,需要从头做起。运维什么的都是自己来。这下要考虑的东西就多了。比如说re...  阅读全文
posted @ 2017-05-10 10:49 xzc 阅读(101) | 评论 (0)编辑 收藏
  2017年4月28日
转自:http://www.cnblogs.com/cyfonly/p/5954614.html

一、为什么需要消息系统

复制代码
1.解耦:   允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。 2.冗余:   消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。 3.扩展性:   因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。 4.灵活性 & 峰值处理能力:   在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。 5.可恢复性:   系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。 6.顺序保证:   在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(Kafka 保证一个 Partition 内的消息的有序性) 7.缓冲:   有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。 8.异步通信:   很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
复制代码

 

二、kafka 架构

2.1 拓扑结构

如下图:

图.1

2.2 相关概念

如图.1中,kafka 相关名词解释如下:

复制代码
1.producer:   消息生产者,发布消息到 kafka 集群的终端或服务。 2.broker:   kafka 集群中包含的服务器。 3.topic:   每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的。 4.partition:   partition 是物理上的概念,每个 topic 包含一个或多个 partition。kafka 分配的单位是 partition。 5.consumer:   从 kafka 集群中消费消息的终端或服务。 6.Consumer group:   high-level consumer API 中,每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。 7.replica:   partition 的副本,保障 partition 的高可用。 8.leader:   replica 中的一个角色, producer 和 consumer 只跟 leader 交互。 9.follower:   replica 中的一个角色,从 leader 中复制数据。 10.controller:   kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover。 12.zookeeper:   kafka 通过 zookeeper 来存储集群的 meta 信息。
复制代码

2.3 zookeeper 节点

kafka 在 zookeeper 中的存储结构如下图所示:

 

图.2

 

三、producer 发布消息

3.1 写入方式

producer 采用 push 模式将消息发布到 broker,每条消息都被 append 到 patition 中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)。

3.2 消息路由

producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。其路由机制为:

1. 指定了 patition,则直接使用; 2. 未指定 patition 但指定 key,通过对 key 的 value 进行hash 选出一个 patition 3. patition 和 key 都未指定,使用轮询选出一个 patition。

 附上 java 客户端分区源码,一目了然:

复制代码
//创建消息实例 public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {      if (topic == null)           throw new IllegalArgumentException("Topic cannot be null");      if (timestamp != null && timestamp < 0)           throw new IllegalArgumentException("Invalid timestamp " + timestamp);      this.topic = topic;      this.partition = partition;      this.key = key;      this.value = value;      this.timestamp = timestamp; }  //计算 patition,如果指定了 patition 则直接使用,否则使用 key 计算 private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) {      Integer partition = record.partition();      if (partition != null) {           List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());           int lastPartition = partitions.size() - 1;           if (partition < 0 || partition > lastPartition) {                throw new IllegalArgumentException(String.format("Invalid partition given with record: %d is not in the range [0...%d].", partition, lastPartition));           }           return partition;      }      return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); }  // 使用 key 选取 patition public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {      List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);      int numPartitions = partitions.size();      if (keyBytes == null) {           int nextValue = counter.getAndIncrement();           List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);           if (availablePartitions.size() > 0) {                int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();                return availablePartitions.get(part).partition();           } else {                return DefaultPartitioner.toPositive(nextValue) % numPartitions;           }      } else {           //对 keyBytes 进行 hash 选出一个 patition           return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;      } }
复制代码

3.3 写入流程

 producer 写入消息序列图如下所示:

图.3

流程说明:

复制代码
1. producer 先从 zookeeper 的 "/brokers/.../state" 节点找到该 partition 的 leader 2. producer 将消息发送给该 leader 3. leader 将消息写入本地 log 4. followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK 5. leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK
复制代码

3.4 producer delivery guarantee

 一般情况下存在三种情况:

1. At most once 消息可能会丢,但绝不会重复传输 2. At least one 消息绝不会丢,但可能会重复传输 3. Exactly once 每条消息肯定会被传输一次且仅传输一次

当 producer 向 broker 发送消息时,一旦这条消息被 commit,由于 replication 的存在,它就不会丢。但是如果 producer 发送数据给 broker 后,遇到网络问题而造成通信中断,那 Producer 就无法判断该条消息是否已经 commit。虽然 Kafka 无法确定网络故障期间发生了什么,但是 producer 可以生成一种类似于主键的东西,发生故障时幂等性的重试多次,这样就做到了 Exactly once,但目前还并未实现。所以目前默认情况下一条消息从 producer 到 broker 是确保了 At least once,可通过设置 producer 异步发送实现At most once。

 

四、broker 保存消息

4.1 存储方式

物理上把 topic 分成一个或多个 patition(对应 server.properties 中的 num.partitions=3 配置),每个 patition 物理上对应一个文件夹(该文件夹存储该 patition 的所有消息和索引文件),如下:

 

图.4

4.2 存储策略

无论消息是否被消费,kafka 都会保留所有消息。有两种策略可以删除旧数据:

1. 基于时间:log.retention.hours=168 2. 基于大小:log.retention.bytes=1073741824

需要注意的是,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高 Kafka 性能无关。

4.3 topic 创建与删除

4.3.1 创建 topic

创建 topic 的序列图如下所示:

图.5

流程说明:

复制代码
1. controller 在 ZooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被创建,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。 2. controller从 /brokers/ids 读取当前所有可用的 broker 列表,对于 set_p 中的每一个 partition: 	2.1 从分配给该 partition 的所有 replica(称为AR)中任选一个可用的 broker 作为新的 leader,并将AR设置为新的 ISR 	2.2 将新的 leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/state 3. controller 通过 RPC 向相关的 broker 发送 LeaderAndISRRequest。
复制代码

4.3.2 删除 topic

删除 topic 的序列图如下所示:

图.6

流程说明:

1. controller 在 zooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被删除,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。 2. 若 delete.topic.enable=false,结束;否则 controller 注册在 /admin/delete_topics 上的 watch 被 fire,controller 通过回调向对应的 broker 发送 StopReplicaRequest。

 

五、kafka HA

5.1 replication

如图.1所示,同一个 partition 可能会有多个 replica(对应 server.properties 配置中的 default.replication.factor=N)。没有 replica 的情况下,一旦 broker 宕机,其上所有 patition 的数据都不可被消费,同时 producer 也不能再将数据存于其上的 patition。引入replication 之后,同一个 partition 可能会有多个 replica,而这时需要在这些 replica 之间选出一个 leader,producer 和 consumer 只与这个 leader 交互,其它 replica 作为 follower 从 leader 中复制数据。

Kafka 分配 Replica 的算法如下:

1. 将所有 broker(假设共 n 个 broker)和待分配的 partition 排序 2. 将第 i 个 partition 分配到第(i mod n)个 broker 上 3. 将第 i 个 partition 的第 j 个 replica 分配到第((i + j) mode n)个 broker上

5.2 leader failover

当 partition 对应的 leader 宕机时,需要从 follower 中选举出新 leader。在选举新leader时,一个基本的原则是,新的 leader 必须拥有旧 leader commit 过的所有消息。

kafka 在 zookeeper 中(/brokers/.../state)动态维护了一个 ISR(in-sync replicas),由3.3节的写入流程可知 ISR 里面的所有 replica 都跟上了 leader,只有 ISR 里面的成员才能选为 leader。对于 f+1 个 replica,一个 partition 可以在容忍 f 个 replica 失效的情况下保证消息不丢失。

当所有 replica 都不工作时,有两种可行的方案:

1. 等待 ISR 中的任一个 replica 活过来,并选它作为 leader。可保障数据不丢失,但时间可能相对较长。 2. 选择第一个活过来的 replica(不一定是 ISR 成员)作为 leader。无法保障数据不丢失,但相对不可用时间较短。

kafka 0.8.* 使用第二种方式。

kafka 通过 Controller 来选举 leader,流程请参考5.3节。

5.3 broker failover

kafka broker failover 序列图如下所示:

图.7

流程说明: 

复制代码
1. controller 在 zookeeper 的 /brokers/ids/[brokerId] 节点注册 Watcher,当 broker 宕机时 zookeeper 会 fire watch 2. controller 从 /brokers/ids 节点读取可用broker 3. controller决定set_p,该集合包含宕机 broker 上的所有 partition 4. 对 set_p 中的每一个 partition     4.1 从/brokers/topics/[topic]/partitions/[partition]/state 节点读取 ISR     4.2 决定新 leader(如4.3节所描述)     4.3 将新 leader、ISR、controller_epoch 和 leader_epoch 等信息写入 state 节点 5. 通过 RPC 向相关 broker 发送 leaderAndISRRequest 命令
复制代码

5.4 controller failover

 当 controller 宕机时会触发 controller failover。每个 broker 都会在 zookeeper 的 "/controller" 节点注册 watcher,当 controller 宕机时 zookeeper 中的临时节点消失,所有存活的 broker 收到 fire 的通知,每个 broker 都尝试创建新的 controller path,只有一个竞选成功并当选为 controller。

当新的 controller 当选时,会触发 KafkaController.onControllerFailover 方法,在该方法中完成如下操作:

复制代码
1. 读取并增加 Controller Epoch。 2. 在 reassignedPartitions Patch(/admin/reassign_partitions) 上注册 watcher。 3. 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注册 watcher。 4. 通过 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注册 watcher。 5. 若 delete.topic.enable=true(默认值是 false),则 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注册 watcher。 6. 通过 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注册Watch。 7. 初始化 ControllerContext 对象,设置当前所有 topic,“活”着的 broker 列表,所有 partition 的 leader 及 ISR等。 8. 启动 replicaStateMachine 和 partitionStateMachine。 9. 将 brokerState 状态设置为 RunningAsController。 10. 将每个 partition 的 Leadership 信息发送给所有“活”着的 broker。 11. 若 auto.leader.rebalance.enable=true(默认值是true),则启动 partition-rebalance 线程。 12. 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,则删除相应的Topic。
复制代码

 

6. consumer 消费消息

6.1 consumer API

kafka 提供了两套 consumer API:

1. The high-level Consumer API 2. The SimpleConsumer API

 其中 high-level consumer API 提供了一个从 kafka 消费数据的高层抽象,而 SimpleConsumer API 则需要开发人员更多地关注细节。

6.1.1 The high-level consumer API

high-level consumer API 提供了 consumer group 的语义,一个消息只能被 group 内的一个 consumer 所消费,且 consumer 消费消息时不关注 offset,最后一个 offset 由 zookeeper 保存。

使用 high-level consumer API 可以是多线程的应用,应当注意:

1. 如果消费线程大于 patition 数量,则有些线程将收不到消息 2. 如果 patition 数量大于线程数,则有些线程多收到多个 patition 的消息 3. 如果一个线程消费多个 patition,则无法保证你收到的消息的顺序,而一个 patition 内的消息是有序的

6.1.2 The SimpleConsumer API

如果你想要对 patition 有更多的控制权,那就应该使用 SimpleConsumer API,比如:

1. 多次读取一个消息 2. 只消费一个 patition 中的部分消息 3. 使用事务来保证一个消息仅被消费一次

 但是使用此 API 时,partition、offset、broker、leader 等对你不再透明,需要自己去管理。你需要做大量的额外工作:

1. 必须在应用程序中跟踪 offset,从而确定下一条应该消费哪条消息 2. 应用程序需要通过程序获知每个 Partition 的 leader 是谁 3. 需要处理 leader 的变更

 使用 SimpleConsumer API 的一般流程如下:

复制代码
1. 查找到一个“活着”的 broker,并且找出每个 partition 的 leader 2. 找出每个 partition 的 follower 3. 定义好请求,该请求应该能描述应用程序需要哪些数据 4. fetch 数据 5. 识别 leader 的变化,并对之作出必要的响应
复制代码

以下针对 high-level Consumer API 进行说明。

6.2 consumer group

如 2.2 节所说, kafka 的分配单位是 patition。每个 consumer 都属于一个 group,一个 partition 只能被同一个 group 内的一个 consumer 所消费(也就保障了一个消息只能被 group 内的一个 consuemr 所消费),但是多个 group 可以同时消费这个 partition。

kafka 的设计目标之一就是同时实现离线处理和实时处理,根据这一特性,可以使用 spark/Storm 这些实时处理系统对消息在线处理,同时使用 Hadoop 批处理系统进行离线处理,还可以将数据备份到另一个数据中心,只需要保证这三者属于不同的 consumer group。如下图所示:

 

图.8

6.3 消费方式

consumer 采用 pull 模式从 broker 中读取数据。

push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。

对于 Kafka 而言,pull 模式更合适,它可简化 broker 的设计,consumer 可自主控制消费消息的速率,同时 consumer 可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。

6.4 consumer delivery guarantee

如果将 consumer 设置为 autocommit,consumer 一旦读到数据立即自动 commit。如果只讨论这一读取消息的过程,那 Kafka 确保了 Exactly once。

但实际使用中应用程序并非在 consumer 读取完数据就结束了,而是要进行进一步处理,而数据处理与 commit 的顺序在很大程度上决定了consumer delivery guarantee:

复制代码
1.读完消息先 commit 再处理消息。     这种模式下,如果 consumer 在 commit 后还没来得及处理消息就 crash 了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于 At most once 2.读完消息先处理再 commit。     这种模式下,如果在处理完消息之后 commit 之前 consumer crash 了,下次重新开始工作时还会处理刚刚未 commit 的消息,实际上该消息已经被处理过了。这就对应于 At least once。 3.如果一定要做到 Exactly once,就需要协调 offset 和实际操作的输出。     精典的做法是引入两阶段提交。如果能让 offset 和操作输入存在同一个地方,会更简洁和通用。这种方式可能更好,因为许多输出系统可能不支持两阶段提交。比如,consumer 拿到数据后可能把数据放到 HDFS,如果把最新的 offset 和数据本身一起写到 HDFS,那就可以保证数据的输出和 offset 的更新要么都完成,要么都不完成,间接实现 Exactly once。(目前就 high-level API而言,offset 是存于Zookeeper 中的,无法存于HDFS,而SimpleConsuemr API的 offset 是由自己去维护的,可以将之存于 HDFS 中)
复制代码

总之,Kafka 默认保证 At least once,并且允许通过设置 producer 异步提交来实现 At most once(见文章《kafka consumer防止数据丢失》)。而 Exactly once 要求与外部存储系统协作,幸运的是 kafka 提供的 offset 可以非常直接非常容易得使用这种方式。

更多关于 kafka 传输语义的信息请参考《Message Delivery Semantics》。

6.5 consumer rebalance

当有 consumer 加入或退出、以及 partition 的改变(如 broker 加入或退出)时会触发 rebalance。consumer rebalance算法如下:

复制代码
1. 将目标 topic 下的所有 partirtion 排序,存于PT 2. 对某 consumer group 下所有 consumer 排序,存于 CG,第 i 个consumer 记为 Ci 3. N=size(PT)/size(CG),向上取整 4. 解除 Ci 对原来分配的 partition 的消费权(i从0开始) 5. 将第i*N到(i+1)*N-1个 partition 分配给 Ci
复制代码

在 0.8.*版本,每个 consumer 都只负责调整自己所消费的 partition,为了保证整个consumer group 的一致性,当一个 consumer 触发了 rebalance 时,该 consumer group 内的其它所有其它 consumer 也应该同时触发 rebalance。这会导致以下几个问题:

复制代码
1.Herd effect   任何 broker 或者 consumer 的增减都会触发所有的 consumer 的 rebalance 2.Split Brain   每个 consumer 分别单独通过 zookeeper 判断哪些 broker 和 consumer 宕机了,那么不同 consumer 在同一时刻从 zookeeper 看到的 view 就可能不一样,这是由 zookeeper 的特性决定的,这就会造成不正确的 reblance 尝试。 3. 调整结果不可控   所有的 consumer 都并不知道其它 consumer 的 rebalance 是否成功,这可能会导致 kafka 工作在一个不正确的状态。
复制代码

基于以上问题,kafka 设计者考虑在0.9.*版本开始使用中心 coordinator 来控制 consumer rebalance,然后又从简便性和验证要求两方面考虑,计划在 consumer 客户端实现分配方案。(见文章《Kafka Detailed Consumer Coordinator Design》和《Kafka Client-side Assignment Proposal》),此处不再赘述。

 

七、注意事项

7.1 producer 无法发送消息的问题

最开始在本机搭建了kafka伪集群,本地 producer 客户端成功发布消息至 broker。随后在服务器上搭建了 kafka 集群,在本机连接该集群,producer 却无法发布消息到 broker(奇怪也没有抛错)。最开始怀疑是 iptables 没开放,于是开放端口,结果还不行(又开始是代码问题、版本问题等等,倒腾了很久)。最后没办法,一项一项查看 server.properties 配置,发现以下两个配置:

复制代码
# The address the socket server listens on. It will get the value returned from  # java.net.InetAddress.getCanonicalHostName() if not configured. #   FORMAT: #     listeners = security_protocol://host_name:port #   EXAMPLE: #     listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://:9092

 # Hostname and port the broker will advertise to producers and consumers. If not set, 
 # it uses the value for "listeners" if configured. Otherwise, it will use the value
 # returned from java.net.InetAddress.getCanonicalHostName().
 #advertised.listeners=PLAINTEXT://your.host.name:9092

复制代码

以上说的就是 advertised.listeners 是 broker 给 producer 和 consumer 连接使用的,如果没有设置,就使用 listeners,而如果 host_name 没有设置的话,就使用 java.net.InetAddress.getCanonicalHostName() 方法返回的主机名。

修改方法:

1. listeners=PLAINTEXT://121.10.26.XXX:9092 2. advertised.listeners=PLAINTEXT://121.10.26.XXX:9092

修改后重启服务,正常工作。关于更多 kafka 配置说明,见文章《Kafka学习整理三(borker(0.9.0及0.10.0)配置)》。

 

八、参考文章

1. 《Kafka剖析(一):Kafka背景及架构介绍

2. 《Kafka设计解析(二):Kafka High Availability (上)

3. 《Kafka设计解析(二):Kafka High Availability (下)

4. 《Kafka设计解析(四):Kafka Consumer解析

5. 《Kafka设计解析(五):Kafka Benchmark

6. 《Kafka学习整理三(borker(0.9.0及0.10.0)配置)

7. 《Using the High Level Consumer

8. 《Using SimpleConsumer

9. 《Consumer Client Re-Design

10. 《Message Delivery Semantics

11. 《Kafka Detailed Consumer Coordinator Design

12. 《Kafka Client-side Assignment Proposal

13. 《Kafka和DistributedLog技术对比

14. 《kafka安装和启动

15. 《kafka consumer防止数据丢失

  

 

作者:cyfonly
本文版权归作者和博客园共有,欢迎转载,未经同意须保留此段声明,且在文章页面明显位置给出原文连接。欢迎指正与交流。
posted @ 2017-04-28 10:37 xzc 阅读(61) | 评论 (0)编辑 收藏
仅列出标题  下一页