IT星球论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

新浪微博账号登陆

只需一步,快速开始

搜索
查看: 110|回复: 0

mycat服务启动{管理模块启动过程}

[复制链接]

1997

主题

1

好友

1万

积分

管理员

Rank: 9Rank: 9Rank: 9

优秀会员 助人为乐 辛勤工作 技术精英 多才多艺 优秀班竹 灌水天才 星球管理 宣传大使 灌水之王 财富勋章 版主勋章 动漫勋章 勤奋会员 论坛精英 PS高手 心 8 闪游皮肤 双鱼座 8★8➹ 志愿者 乖

发表于 2016-2-18 12:23:11 |显示全部楼层
mycat服务启动{管理模块启动过程}
mycat启动的时候启动了三个模块1:NIOConnector(负责链接MySQL数据库,连接池以数据库为准不以链接字符串为准),
1:NIOAcceptor,ManagerConnectionFactory(管理模块,默认端口9066)
2:NIOAcceptor,ServerConnectionFactory(mysql服务模块,默认端口8066)
这里介绍下管理模块的启动流程
顺序图
NIO和AIOmycat分别实现了NIO和AIO,由于linux当前没有真正实现AIO这里主要介绍NIO的流程。
NIO的Reactor与AIO的Proactor两种模式的场景区别:
下面是Reactor的做法:
1.        等待事件响应 (Reactor job)
2.        分发 “Ready-to-Read” 事件给用户句柄 ( Reactor job)
3.        读数据 (user handler job)
4.        处理数据( user handler job)
下面再来看看真正意义的异步模式Proactor是如何做的:
1.        等待事件响应 (Proactor job)
2.        读数据 (Proactor job)
3.        分发 “Read-Completed” 事件给用户句柄 (Proactor job)
4.        处理数据(user handler job)
mycat的NIO实现
Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。
Selector可以监听四种不同类型的事件:
- Connect
- Accept
- Read
- Write
这四种事件用SelectionKey的四个常量来表示:
- SelectionKey.OP_CONNECT
- SelectionKey.OP_ACCEPT
- SelectionKey.OP_READ
- SelectionKey.OP_WRITE
前面已经说了,NIO采用的Reactor模式:例如汽车是乘客访问的主体(Reactor),乘客上车后,到售票员(acceptor)处登记,之后乘客便可以休息睡觉去了,当到达乘客所要到达的目的地后,售票员将其唤醒即可。

核心顺序

mycat管理端的启动流程1:new ManagerConnectionFactory extends FrontendConnectionFactory
2:new NIOReactorPool,new NIOReactor,new RW中new ConcurrentLinkedQueue()而AbstractConnection中new NIosocketWR
3:new NIOAcceptor中向反应堆中注册了OP_ACCEPT,该类继承了Thread然后start启动
accept                        channel = serverChannel.accept();                        channel.configureBlocking(false);                        FrontendConnection c = factory.make(channel);                        c.setAccepted(true);                        c.setId(ID_GENERATOR.getId());                        NIOProcessor processor = (NIOProcessor) MycatServer.getInstance()                                        .nextProcessor();                        c.setProcessor(processor);                        LOGGER.info("accept");                        NIOReactor reactor = reactorPool.getNextReactor();                        reactor.postRegister(c);
factory.make(channel):最终构造了ManagerQueryHandler(管理命令解析器)和FrontendAuthenticator(mycat权限解析器)
reactor.postRegister(c):把当前链接添加到reactor的registerQueue中并唤醒reactor的selector
read在NIOReactor的registerQueue为空的时候run循环空运转,当上一步把accept的链接放到队列的时候则
                        for (;;) {                                ++reactCount;                                try {                                        selector.select(500L);                                        register(selector);                                        keys = selector.selectedKeys();                                        for (SelectionKey key : keys) {                                                AbstractConnection con = null;                                                try {                                                        Object att = key.attachment();                                                        if (att != null) {                                                                con = (AbstractConnection) att;                                                                if (key.isValid() && key.isReadable()) {                                                                        try {                                                                                con.asynRead();                                                                        } catch (IOException e) {                                        con.close("program err:" + e.toString());                                                                                continue;                                                                        } catch (Exception e) {                                                                                LOGGER.debug("caught err:", e);                                                                                con.close("program err:" + e.toString());                                                                                continue;                                                                        }                                                                }                                                                if (key.isValid() && key.isWritable()) {                                                                        con.doNextWriteCheck();                                                                }                                                        } else {                                                                key.cancel();                                                        }                        } catch (CancelledKeyException e) {                            if (LOGGER.isDebugEnabled()) {                                LOGGER.debug(con + " socket key canceled");                            }                        } catch (Exception e) {                            LOGGER.warn(con + " " + e);                        }                                        }                                } catch (Exception e) {                                        LOGGER.warn(name, e);                                } finally {                                        if (keys != null) {                                                keys.clear();                                        }                                }
register(selector);也即
((NiOSocketWR) c.getSocketWR()).register(selector); 注册OP_READ事件
                                        c.register();即FrontendConnection的register发送握手数据包
con.asynRead();即NIOSocketWR的asynRead即
        public void asynRead() throws IOException {                LOGGER.info("asynRead");                ByteBuffer theBuffer = con.readBuffer;                if (theBuffer == null) {                        theBuffer = con.processor.getBufferPool().allocate();                        con.readBuffer = theBuffer;                }                int got = channel.read(theBuffer);                con.onReadData(got);        }
con.onReadData(got);即AbstractConnection的onReadData这里拆包得到完成的数据包后调用
handler.handle(data);也即FrontendAuthenticator的handle在这里check user;check password;check schema如果失败则将失败信息写入缓冲区,如果成功
则把AbstractConnection的默认hander从FrontendAuthenticator换成FrontendCommandHandler等待接下来的处理(比如show命令等,
以上的处理是发生在输入mysql -utest -ptest -h10.97.177.83 -P9066时)
认证完成后下一次的handler.handle(data)则使用FrontendCommandHandler的handle来处理也即
    public void handle(byte[] data)    {        if(source.getLoadDataInfileHandler()!=null&&source.getLoadDataInfileHandler().isStartLoadData())        {            MySQLMessage mm = new MySQLMessage(data);            int  packetLength = mm.readUB3();            if(packetLength+4==data.length)            {                source.loadDataInfileData(data);            }            return;        }        switch (data[4])        {            case MySQLPacket.COM_INIT_DB:                commands.doInitDB();                source.initDB(data);                break;            case MySQLPacket.COM_QUERY:                commands.doQuery();                source.query(data);                break;            case MySQLPacket.COM_PING:                commands.doPing();                source.ping();                break;            case MySQLPacket.COM_QUIT:                commands.doQuit();                source.close("quit cmd");                break;            case MySQLPacket.COM_PROCESS_KILL:                commands.doKill();                source.kill(data);                break;            case MySQLPacket.COM_STMT_PREPARE:                commands.dostmtPrepare();                source.stmtPrepare(data);                break;            case MySQLPacket.COM_STMT_EXECUTE:                commands.doStmtExecute();                source.stmtExecute(data);                break;            case MySQLPacket.COM_STMT_CLOSE:                commands.doStmtClose();                source.stmtClose(data);                break;            case MySQLPacket.COM_HEARTBEAT:                commands.doHeartbeat();                source.heartbeat(data);                break;            default:                     commands.doOther();                     source.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR,                             "Unknown command");        }    }
source.query(data);即queryHandler.query(sql);这里的queryHandler是ManagerQueryHandler即
    public void query(String sql) {        ManagerConnection c = this.source;        if (LOGGER.isDebugEnabled()) {            LOGGER.debug(new StringBuilder().append(c).append(sql).toString());        }        int rs = ManagerParse.parse(sql);        switch (rs & 0xff) {            case ManagerParse.SELECT:                SelectHandler.handle(sql, c, rs >>> SHIFT);                break;            case ManagerParse.SET:                c.write(c.writeToBuffer(OkPacket.OK, c.allocate()));                break;            case ManagerParse.SHOW:                ShowHandler.handle(sql, c, rs >>> SHIFT);                break;            case ManagerParse.SWITCH:                SwitchHandler.handler(sql, c, rs >>> SHIFT);                break;            case ManagerParse.KILL_CONN:                KillConnection.response(sql, rs >>> SHIFT, c);                break;            case ManagerParse.OFFLINE:                Offline.execute(sql, c);                break;            case ManagerParse.ONLINE:                Online.execute(sql, c);                break;            case ManagerParse.STOP:                StopHandler.handle(sql, c, rs >>> SHIFT);                break;            case ManagerParse.RELOAD:                ReloadHandler.handle(sql, c, rs >>> SHIFT);                break;            case ManagerParse.ROLLBACK:                RollbackHandler.handle(sql, c, rs >>> SHIFT);                break;            case ManagerParse.CLEAR:                ClearHandler.handle(sql, c, rs >>> SHIFT);                break;            case ManagerParse.CONFIGFILE:                ConfFileHandler.handle(sql, c);                break;            case ManagerParse.LOGFILE:                ShowServerLog.handle(sql, c);                break;            default:                c.writeErrMessage(ErrorCode.ER_YES, "Unsupported statement");        }    }
总结mycat的网络处理逻辑上是通过队列加上后台线程来实现了accept和read的解耦从而实现了高性能,但是代码写的就不敢恭维。
http://www.myext.cn/c/a_13455.html
mycat服务启动{管理模块启动过程}

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?立即注册 新浪微博账号登陆

该会员没有填写今日想说内容.
您需要登录后才可以回帖 登录 | 立即注册 新浪微博账号登陆

回顶部