IT星球论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

新浪微博账号登陆

只需一步,快速开始

搜索
查看: 238|回复: 0

CloudFoundry之DEA源码分析

[复制链接]

2000

主题

1

好友

1万

积分

管理员

Rank: 9Rank: 9Rank: 9

优秀会员 助人为乐 辛勤工作 技术精英 多才多艺 优秀班竹 灌水天才 星球管理 宣传大使 灌水之王 财富勋章 版主勋章 动漫勋章 勤奋会员 论坛精英 PS高手 心 8 闪游皮肤 双鱼座 8★8➹ 志愿者 乖

发表于 2016-1-11 12:14:14 |显示全部楼层
CloudFoundry之DEA源码分析
DEA启动分析



Ruby的代码结构一般都是一样的:

bin目录是执行文件
conf是配置文件
lib则是ruby source
spec放置一些测试的文件
Gemfile+Gemfile.lock为bundler服务,让dea可以找到正确的gem依赖
Rakefile是用来做测试的

先看下dea的目录结构,bin 放的是脚本,等同shell的作用,脚本引用的是lib/dea.rb,而lib/dea.rb接着引用/lib/dea/agent.rb来完成请求响应,lib负责了所有的逻辑业务,可以说lib是工程的核心,

config放配置文件,spec是一些test unit.  我们重点来看lib/dea.rb和agent.rb

1. lib/dea.rb

IT论坛不用关注,就是接收参数,处理参数.来看最后几句:

[size=0.95em]EM.epollEM.run {  agent = DEA::Agent.new(config)  agent.run()}

普及下EventMachine http://blog.csdn.net/resouer/article/details/7975550
EventMachine 开启epoll模式,默认为select,epoll比select高效,原因见这篇文章:http://eventmachine.rubyforge.org/docs/EPOLL.html,
CF中几乎所有的组件启动时,都用EM,所有还是有必要搞清楚.这里EM.run就是开启了这个I/O,整个启动操作都在EM中进行,为什么不用ruby的多线程?
这两句代码不用解释了,初始化Agent,然后执行run.赶紧来看run方法都做了些什么操作,由于代码比较多,我们挑重点来分析,但基本流程不会省.

1. 准备文件目录,以及处理参数

[size=0.95em]     # Make sure all the correct directories exist.      begin        FileUtils.mkdir_p(@droplet_dir)        FileUtils.mkdir_p(@staged_dir)        FileUtils.mkdir_p(@apps_dir)        FileUtils.mkdir_p(@db_dir)        if @secure # Allow traversal by secure users          FileUtils.chmod(0711, @apps_dir)          FileUtils.chmod(0711, @droplet_dir)        end      rescue => e        @logger.fatal("Can't create support directories: #{e}")        exit 1      end

2. 循环地开启file_viewer服务,web服务器用thin,这里用到了EM.next_tick,指的是不用马上执行,EM自已调度,空闲的时候执行.因为这一步不太重要,让主进程让给更重要的事情吧.

[size=0.95em] EM.next_tick do        unless start_file_viewer          # Periodically try to start the file viewer in case of port contention          @filer_start_timer = EM.add_periodic_timer(1) do            if start_file_viewer              EM.cancel_timer(@filer_start_timer)              @filer_start_timer = nil            end          end        end      end

3. EM定义了两个错误处理回调方法,暂时触发不了

[size=0.95em]#James 定义NATS的回调行为,这里是在NATS发生错误时,将droplet ID列表保存在db/application.json中,恢复时用.      #默认recovered_droplets=false      NATS.on_error do |e|        @logger.error("EXITING! NATS error: #{e}")        @logger.error(e)        # Only snapshot app state if we had a chance to recover saved state. This prevents a connect error        # that occurs before we can recover state from blowing existing data away.        snapshot_app_state if @recovered_droplets        exit!      end      EM.error_handler do |e|        @logger.error "Eventmachine problem, #{e}"        @logger.error(e)      end

4.清理资源,理解为资源回收.

[size=0.95em] # Calculate how much disk is available before we respond to any messages      update_droplet_fs_usage(:blocking => true)

5. 接下来这个比较大头.

[size=0.95em]NATS.start(:uri => @nats_uri) do        # Register ourselves with the system        status_config = @config['status'] || {}        VCAP::Component.register(:type => 'DEA',                           :host => @local_ip,                           :index => @config['index'],                           :config => @config,                           :port => status_config['port'],                           :user => status_config['user'],                           :password => status_config['password'])        uuid = VCAP::Component.uuid        # Setup our identity        @hello_message = { :id => uuid, :ip => @local_ip, :port => @file_viewer_port, :version => VERSION }.freeze        @hello_message_json = @hello_message.to_json        # Setup our listeners..        NATS.subscribe('dea.status') { |msg, reply| process_dea_status(msg, reply) }        NATS.subscribe('droplet.status') { |msg, reply| process_droplet_status(msg, reply) }        NATS.subscribe('dea.discover') { |msg, reply| process_dea_discover(msg, reply) }        NATS.subscribe('dea.find.droplet') { |msg, reply| process_dea_find_droplet(msg, reply) }        NATS.subscribe('dea.update') { |msg| process_dea_update(msg) }        NATS.subscribe('dea.stop') { |msg| process_dea_stop(msg) }        NATS.subscribe("dea.#{uuid}.start") { |msg| process_dea_start(msg) }        NATS.subscribe('router.start') {  |msg| process_router_start(msg) }        NATS.subscribe('healthmanager.start') { |msg| process_healthmanager_start(msg) }        NATS.subscribe('dea.locate') { |msg|  process_dea_locate(msg) }        # Recover existing application state.        recover_existing_droplets        delete_untracked_instance_dirs        EM.add_periodic_timer(@heartbeat_interval) { send_heartbeat }        EM.add_periodic_timer(@advertise_interval) { send_advertise }        EM.add_timer(MONITOR_INTERVAL) { monitor_apps }        EM.add_periodic_timer(CRASHES_REAPER_INTERVAL) { crashes_reaper }        EM.add_periodic_timer(VARZ_UPDATE_INTERVAL) { snapshot_varz }        EM.add_periodic_timer(DROPLET_FS_PERCENT_USED_UPDATE_INTERVAL) { update_droplet_fs_usage }        NATS.publish('dea.start', @hello_message_json)        send_advertise      end

连接上NATS服务,这一步是必须而且优先的,因为CF的通信全靠它了; 然后弄个回调方法,向VCAP注册自已,每个组件启动时都要向VCAP注册自已,全局监控用.至于怎么监控.查看代码以及官方文档,说是注册后,就会给这个组件启个服务(thin server),用于CF周期性地发http请求来知晓各组件状态,http URL 分为两个,一个是/healthz,简单地返回OK不OK; /varz则返回一堆运行时参数.但从源代码上并不是这么回事.不是所有组件都启thin服务,再深入研究吧.

接下来是订阅一堆NATS message,这也是各个组件启动时要做的事情.各个sub_message都有block来处理,有些要返回值,有些不用.

recover_existing_droplets,如果是重启,并开启了恢复机制,这个就是恢复之前运行正常的droplet.当dea 重启时,这些上次正常运行的droplet是如何记录下来的呢,看看第3步snapshot_app_state if @recovered_droplets, 这个是说当nat 出错了或是啥地,把droplets 快照一份保存起来.供下一次重启时读取.

delete_untracked_instance_dirs,删掉那些僵死的或是无法追踪的droplet,然后发布一堆设置一堆timer,周期性地发NATS message,有heartbeat,有monitor message,反正是让IT论坛组件知道些事情.关于组件间的NATS message交互,请阅读http://apidocs.cloudfoundry.com/的NATS message,发布dea.start,是发给cc的,告诉它,已经准备好了,随时可以接收上级指示.

send_advertise,就是发布dea.advertise,也是给CC的.

总结, dea 启动时,向vcap注册自已,然后订阅,发布消息. 要知道dea都有哪些功能,其它就看他的NATS message就知道了. 参考http://apidocs.cloudfoundry.com/ 的NATS message


DEA功能点分析

1. 启动实例
对应的nats message是 dea.uuid.start。流程最为复杂。

(1)简单说就是先计算资源够不够用,
(2)然后添加至droplets数组中,给IT论坛组件发信息时用到,还有就是快照,dea挂掉重启时,根据快照的droplet id来恢复各droplet。
(3)根据收到的message,去download droplet。这里有cache和mount的处理判断。如果本地已经down下来,或是dea server跟cc的stager目录有mount,直接就可以搞。
(4)判断runtime是不是符合,如果配置文件里没说明到位,还有个runtime自发现的过程。
(5)droplet拉回来了,runtime也OK了,就安排启动需要的端口之类的,看看有没什么要先执行的。没有的话,就export一堆env,再执行startup

2. 停止实例
对应的nats message是 dea.stop。 回收资源,删掉droplet,从droplets数组中剔除等。

3. 启动dea
调用/lib/dea.rb===>agent.run(),见本文的第一部分详解。

4. 停止dea
调用vcap_dev stop dea ==>vcap ==>vcap_components.stop,其实就是直接kill -9,还有几个kill TEAM之类的,代码里埋了一些触发点 trap("xx").

5. 更新droplet
对应nats message是 dea.update,看代码里只是更新uris,就是域名

6. 查看droplet信息
对应dea.find.droplet。返回droplet的详细信息,包括droplet所处服务器的ip,port,log路径之类的。CC可以据此拼出log地址,从而读取。

7. 发送心跳
向hm汇报此dea的所有droplet情况,必须全部的droplet都要汇报。hm拿这些数据跟CCD里的数据相比较,如果有不一致的情况,说明有问题,告知cc,cc会做相应的操作。

8. 向router注册
向router汇报此dea的所有droplet情况,必须全部的droplet都要汇报,router根据此来判断droplet是否正常(一定时间内无消息返回,就视为无效,下回请求就不会路由过去了)。

9. 回应cc的discover请求
当cc在cache里找不到合适的dea启动instance,就发个discover广播,所有dea都订阅,而且设置了立马reply。各DEA视自已的资源是否满足要求,满足的话就回个话。CC用收到的第一个dea来启动实例。dea回话的时候,有个延迟策略,即计算自身的已启动的instance数量,内存使用量,延迟一定时间再回话,不当出头鸟,但最大延迟是0.25秒。

DEA资源隔离

1. 先介绍当前vcap的资源隔离方法

首先,要开启secure模式,在config/dea.yml里有设置,同时必须是以root权限启的vcap。
vcap会选择一个安全用户来启动这个droplet,但一般是没有的,vcap会自动给创建,这些用户是以“vcap_user”开头的,没有home目录,没有密码之类的。默认的user group是“vcap_group”,查看secure.create_default_group方法:

[size=0.95em]    def create_default_group      # Add in default group      cmd = "addgroup --system #{DEFAULT_SECURE_GROUP} > /dev/null 2<&1" if islinux      cmd = "dscl . -create /Groups/#{DEFAULT_SECURE_GROUP} PrimaryGroupID #{SECURE_UID_BASE}" if isMacOSX      system(cmd)    end   

[size=0.95em]    args = [          "adduser",          "--firstuid %d" % (SECURE_UID_BASE + 1),          "--ingroup %s" % DEFAULT_SECURE_GROUP,          "--home /nonexistent",          "--no-create-home",          "--shell /bin/sh",          "--disabled-login",          "--gecos ''",          "--quiet",          username,        ]   

然后,选一个用户,记录在instance属性中,instance:user=user,把相关的目录权限赋给这个用户,并且把fetch回来的droplet所在的目录及文件权限,所属者等,都给此用户。最后,启动的时候,以此用户启动,带上一些ulimit参数来限制这个droplet使用的资源。

[size=0.95em]    if @secure || @enforce_ulimit            process.send_data("renice 0 $$\n")            process.send_data("ulimit -m #{mem_kbytes} 2> /dev/null\n")  # ulimit -m takes kb, soft enforce            process.send_data("ulimit -v 3000000 2> /dev/null\n") # virtual memory at 3G, this will be enforced            process.send_data("ulimit -n #{num_fds} 2> /dev/null\n")            process.send_data("ulimit -u 512 2> /dev/null\n") # processes/threads            process.send_data("ulimit -f #{disk_limit} 2> /dev/null\n") # File size to complete disk usage            process.send_data("umask 077\n")          end   

同时我们发现在非安全模式下,是没有任何限制的,droplet可以随便占满内存,用尽cpu,霸占磁盘空间...。这里解释几个参数吧,-m 是可使用最大内存,-v 最大虚拟内存,-n 一次最大的文件打开数量,-u 线程数, -f 单次打开的最大文件,不是磁盘限制。

用ulimit方式,可以简单地实现资源隔离,但是有很大的问题。

(1) kill,当进程的资源超过限制,此进程会被系统无情kill掉。会被health manager知道,health manager告知cc,cc会找个dea来启动,某种条件下,会导致无休止的重启--挂-重启--挂。
(2) 没有真正做到各种资源隔离,就像vm层面的一样。如cpu,io,namespace等。
(3) 继续添加。。。

资源隔离是多么重要,所有dea将这个功能单独移出来,放在另外的github目录上: https://github.com/cloudfoundry/warden

warden是采用cgroup:http://en.wikipedia.org/wiki/Cgroups ,用vm代价太大,用LXC也是,同时CF想将wardean做到可移值。可以说cgroup是个轻量级的虚拟化。简洁,不会像vm那么复杂,同时拥有跟宿主机一样的速度。具体的介绍请看源码,warden的安装使用将参见:

warden 安装 http://blog.csdn.net/k_james/article/details/8523864

warden使用与原理分析 http://blog.csdn.net/k_james/article/details/8523934

代码难点列举在CF中,eventmachine和Fiber很受欢迎。

1. EM.run
这个在第一部分也讲过,有两篇文章不错,建议看看。EM.run说明包含在内的动作都是在这个框架下执行,具有很高的并发量等好处。

[size=0.95em]    #指定用epoll模式,而不是select或其它,可能在不久的将来会成为默认。    EM.epoll    EM.run {      agent = DEA::Agent.new(config)      agent.run()    }   

2. EM.system
EM.run 内部,若又来个EM.system,意思是EM.defer,这段代码很核心,又要执行很久,放到后台去。如果后面还有IT论坛逻辑要做,就不能用EM.system了,简单用system。

[size=0.95em]   EM.system('/bin/sh', du_proc, cont_proc)   

3. EM.next_tick do
也在EM.run内部,意思是这段代码不是很着急执行,可以交给进程自己调度,有空了再执行。

[size=0.95em] EM.next_tick do        unless start_file_viewer          # Periodically try to start the file viewer in case of port contention          @filer_start_timer = EM.add_periodic_timer(1) do            if start_file_viewer              EM.cancel_timer(@filer_start_timer)              @filer_start_timer = nil            end          end        end      end

4. Bundler.with_clean_env
这个是说,要在ruby进程中再开个进程,但又不受ruby进程的影响,Bundler.with_clean_env会清空ENV,相当于可以在一段干净的空间里做些事情。
跟EM.system配合使用,效果更佳。

[size=0.95em]    Bundler.with_clean_env { EM.system("#{@dea_ruby} -- #{prepare_script} true #{sh_command}", exec_operation, exit_operation) }   

5. Fiber
这个类似线程,但不同的是它可以挂起,暂停等操作。

[size=0.95em]   f = Fiber.new do   ...   ...   r.resume   r.yelid   




本文到此结束,有兴趣的欢迎一起研究讨论,微博:http://weibo.com/kingjames3
来源:http://www.csdn123.com/html/mycs ... d8a3f208c98feb.html
CloudFoundry之DEA源码分析

该会员没有填写今日想说内容.
您需要登录后才可以回帖 登录 | 立即注册 新浪微博账号登陆

回顶部