IT星球论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

新浪微博账号登陆

只需一步,快速开始

搜索
查看: 249|回复: 0

深入剖析SolrCloud(四)

[复制链接]

2004

主题

1

好友

1万

积分

管理员

Rank: 9Rank: 9Rank: 9

优秀会员 助人为乐 辛勤工作 技术精英 多才多艺 优秀班竹 灌水天才 星球管理 宣传大使 灌水之王 财富勋章 版主勋章 动漫勋章 勤奋会员 论坛精英 PS高手 心 8 闪游皮肤 双鱼座 8★8➹ 志愿者 乖

发表于 2016-3-3 12:20:37 |显示全部楼层
深入剖析solrCloud(四)
上一篇中介绍了连接Zookeeper集群的方法,这一篇将围绕一个有趣的话题---来展开,这就是Replication(索引复制),关于Solr Replication的详细介绍,可以参考http://wiki.apache.org/solr/SolrReplication           在开始这个话题之前,先从我最近在应用中引入solrmaster/slave架构时,遇到的一个让我困扰的实际问题。
  
应用场景简单描述如下:
  
1)首先master节点下载索引分片,然后创建配置文件,加入master节点的replication配置片段,再对索引分片进行合并(关于mergeIndex,可以参考http://wiki.apache.org/solr/MergingSolrIndexes),然后利用上述配置文件和索引数据去创建一个solr核。
  
2)slave节点创建配置文件,加入slave节点的replication配置片段,创建一个空的solr核,等待从master节点进行索引数据同步
  
出现的问题:slave节点没有从master节点同步到数据。
  
问题分析:
  
1)首先检查master节点,获取最新的可复制索引的版本号,
  
[url=]http://master_host:port/solr/replication?command=indexversion[/url]
  
发现返回的索引版本号是0,这说明mater节点根本没有触发replication动作,
  
2)为了确认上述判断,在slave节点上进一步查看replication的详细信息
  
[url=]http://slave_host:port/solr/replication?command=details[/url]
  
发现确实如此,尽管master节点的索引版本号和slave节点的索引版本号不一致,但索引却没有同步过来,再分别查看master节点和slave节点的日志,发现索引复制动作确实没有开始。
  
综上所述,确实是master节点没有触发索引复制动作,那究竟是为何呢?先将原因摆出来,后面会通过源码的分析来加以说明。
  
原因:solr合并索引时,不管你是通过mergeindexes的http命令,还是调用底层lucene的IndexWriter,记得最后一定要提交一个commit,否则,不仅索引不仅不会对查询可见,更是对于master/slave架构的solr集群来说,master节点的replication动作不会触发,因为indexversion没有感知到变化。
           好了,下面开始对SolrReplication的分析
           Solr容器在加载solr核的时候,会对已经注册的各个实现SolrCoreAware接口的Handler进行回调,调用其inform方法。
           对于ReplicationHandler来说,就是在这里对自己是属于master节点还是slave节点进行判断,若是slave节点,则创建一个Snappuller对象,定时负责从master节点主动拉索引数据下来;若是master节点,则只设置相应的参数。
[url=][/url]
  public void inform(SolrCore core) {
    this.core = core;
    registerFileStreamResponseWriter();
    registerCloseHook();
    NamedList slave = (NamedList) initArgs.get("slave");
    boolean enableSlave = isEnabled( slave );
    if (enableSlave) {
      tempSnapPuller = snapPuller = new SnapPuller(slave, this, core);
      isSlave = true;
    }
    NamedList master = (NamedList) initArgs.get("master");
    boolean enableMaster = isEnabled( master );
   
    if (!enableSlave && !enableMaster) {
      enableMaster = true;
      master = new NamedList<Object>();
    }
   
    if (enableMaster) {
      includeConfFiles = (String) master.get(CONF_FILES);
      if (includeConfFiles != null && includeConfFiles.trim().length() > 0) {
        List<String> files = Arrays.asList(includeConfFiles.split(","));
        for (String file : files) {
          if (file.trim().length() == 0) continue;
          String[] strs = file.split(":");
          // if there is an alias add it or it is null
          confFileNameAlias.add(strs[0], strs.length > 1 ? strs[1] : null);
        }
        LOG.info("Replication enabled for following config files: " + includeConfFiles);
      }
      List backup = master.getAll("backupAfter");
      boolean backupOnCommit = backup.contains("commit");
      boolean backupOnOptimize = !backupOnCommit && backup.contains("optimize");
      List replicateAfter = master.getAll(REPLICATE_AFTER);
      replicateOnCommit = replicateAfter.contains("commit");
      replicateOnOptimize = !replicateOnCommit && replicateAfter.contains("optimize");

      if (!replicateOnCommit && ! replicateOnOptimize) {
        replicateOnCommit = true;
      }
      
      // if we only want to replicate on optimize, we need the deletion policy to
      
// save the last optimized commit point.
      if (replicateOnOptimize) {
        IndexDeletionPolicyWrapper wrapper = core.getDeletionPolicy();
        IndexDeletionPolicy policy = wrapper == null ? null : wrapper.getWrappedDeletionPolicy();
        if (policy instanceof SolrDeletionPolicy) {
          SolrDeletionPolicy solrPolicy = (SolrDeletionPolicy)policy;
          if (solrPolicy.getMaxOptimizedCommitsToKeep() < 1) {
            solrPolicy.setMaxOptimizedCommitsToKeep(1);
          }
        } else {
          LOG.warn("Replication can't call setMaxOptimizedCommitsToKeep on " + policy);
        }
      }

      if (replicateOnOptimize || backupOnOptimize) {
        core.getUpdateHandler().registerOptimizeCallback(getEventListener(backupOnOptimize, replicateOnOptimize));
      }
      if (replicateOnCommit || backupOnCommit) {
        replicateOnCommit = true;
        core.getUpdateHandler().registerCommitCallback(getEventListener(backupOnCommit, replicateOnCommit));
      }
      if (replicateAfter.contains("startup")) {
        replicateOnStart = true;
        RefCounted<SolrIndexSearcher> s = core.getNewestSearcher(false);
        try {
          DirectoryReader reader = s==null ? null : s.get().getIndexReader();
          if (reader!=null && reader.getIndexCommit() != null && reader.getIndexCommit().getGeneration() != 1L) {
            try {
              if(replicateOnOptimize){
                Collection<IndexCommit> commits = DirectoryReader.listCommits(reader.directory());
                for (IndexCommit ic : commits) {
                  if(ic.getSegmentCount() == 1){
                    if(indexCommitPoint == null || indexCommitPoint.getGeneration() < ic.getGeneration()) indexCommitPoint = ic;
                  }
                }
              } else{
                indexCommitPoint = reader.getIndexCommit();
              }
            } finally {
              // We don't need to save commit points for replication, the SolrDeletionPolicy
              
// always saves the last commit point (and the last optimized commit point, if needed)
              /***
              if(indexCommitPoint != null){
                core.getDeletionPolicy().saveCommitPoint(indexCommitPoint.getGeneration());
              }
              **
*/
            }
          }

          // reboot the writer on the new index
          core.getUpdateHandler().newIndexWriter();

        } catch (IOException e) {
          LOG.warn("Unable to get IndexCommit on startup", e);
        } finally {
          if (s!=null) s.decref();
        }
      }
      String reserve = (String) master.get(RESERVE);
      if (reserve != null && !reserve.trim().equals("")) {
        reserveCommitDuration = SnapPuller.readInterval(reserve);
      }
      LOG.info("Commits will be reserved for  " + reserveCommitDuration);
      isMaster = true;
    }
}
[url=][/url]

  ReplicationHandler可以响应多种命令:
  
1)       indexversion。
  
这里需要了解的第一个概念是索引提交点(IndexCommit),这是底层lucene的东西,可以自行查阅资料。首先获取最新的索引提交点,然后从其中获取索引版本号和索引所属代。
      IndexCommit commitPoint = indexCommitPoint;  // make a copy so it won't change
      if (commitPoint != null && replicationEnabled.get()) {
        core.getDeletionPolicy().setReserveDuration(commitPoint.getVersion(), reserveCommitDuration);
        rsp.add(CMD_INDEX_VERSION, commitPoint.getVersion());
        rsp.add(GENERATION, commitPoint.getGeneration());

    2backup。这个命令用来对索引做快照。首先获取最新的索引提交点,然后创建做一个SnapShooter,具体的快照动作由这个对象完成,
[url=][/url]
   private void dosnapShoot(SolrParams params, SolrQueryResponse rsp, SolrQueryRequest req) {
    try {
      int numberToKeep = params.getInt(NUMBER_BACKUPS_TO_KEEP, Integer.MAX_VALUE);
      IndexDeletionPolicyWrapper delPolicy = core.getDeletionPolicy();
      IndexCommit indexCommit = delPolicy.getLatestCommit();
      
      if(indexCommit == null) {
        indexCommit = req.getSearcher().getReader().getIndexCommit();
      }
      
      // small race here before the commit point is saved
      new SnapShooter(core, params.get("location")).createSnapAsync(indexCommit, numberToKeep, this);
      
    } catch (Exception e) {
      LOG.warn("Exception during creating a snapshot", e);
      rsp.add("exception", e);
    }
  }

快照对象会启动一个线程去异步地做一个索引备份。
  
void createSnapAsync(final IndexCommit indexCommit, final int numberToKeep, final ReplicationHandler replicationHandler) {
  
    replicationHandler.core.getDeletionPolicy().saveCommitPoint(indexCommit.getVersion());
  
  
    new Thread() {
  
      @Override
  
      public void run() {
  
        createSnapshot(indexCommit, numberToKeep, replicationHandler);
  
      }
  
    }.start();
  
}
  
  
void createSnapshot(final IndexCommit indexCommit, int numberToKeep, ReplicationHandler replicationHandler) {
  
    NamedList details = new NamedList();
  
    details.add("startTime", new Date().toString());
  
    File snapShotDir = null;
  
    String directoryName = null;
  
    Lock lock = null;
  
    try {
  
      if(numberToKeep<Integer.MAX_VALUE) {
  
        deleteOldBackups(numberToKeep);
  
      }
  
      SimpleDateFormat fmt = new SimpleDateFormat(DATE_FMT, Locale.US);
  
      directoryName = "snapshot." + fmt.format(new Date());
  
      lock = lockFactory.makeLock(directoryName + ".lock");
  
      if (lock.isLocked()) return;
  
      snapShotDir = new File(snapDir, directoryName);
  
      if (!snapShotDir.mkdir()) {
  
        LOG.warn("Unable to create snapshot directory: " + snapShotDir.getAbsolutePath());
  
        return;
  
      }
  
      Collection<String> files = indexCommit.getFileNames();
  
      FileCopier fileCopier = new FileCopier(solrCore.getDeletionPolicy(), indexCommit);
  
      fileCopier.copyFiles(files, snapShotDir);
  
  
      details.add("fileCount", files.size());
  
      details.add("status", "success");
  
      details.add("snapshotCompletedAt", new Date().toString());
  
    } catch (Exception e) {
  
      SnapPuller.delTree(snapShotDir);
  
      LOG.error("Exception while creating snapshot", e);
  
      details.add("snapShootException", e.getMessage());
  
    } finally {
  
      replicationHandler.core.getDeletionPolicy().releaseCommitPoint(indexCommit.getVersion());   
  
      replicationHandler.snapShootDetails = details;
  
      if (lock != null) {
  
        try {
  
          lock.release();
  
        } catch (IOException e) {
  
          LOG.error("Unable to release snapshoot lock: " + directoryName + ".lock");
  
        }
  
      }
  
    }
  
  }
  
3)fetchindex。响应来自slave节点的取索引文件的请求,会启动一个线程来实现索引文件的获取。
  
      String masterUrl = solrParams.get(MASTER_URL);
  
      if (!isSlave && masterUrl == null) {
  
        rsp.add(STATUS,ERR_STATUS);
  
        rsp.add("message","No slave configured or no 'masterUrl' Specified");
  
        return;
  
      }
  
      final SolrParams paramsCopy = new ModifiableSolrParams(solrParams);
  
      new Thread() {
  
        @Override
  
        public void run() {
  
          doFetch(paramsCopy);
  
        }
  
      }.start();
  
      rsp.add(STATUS, OK_STATUS);
  
具体的获取动作是通过SnapPuller对象来实现的,首先尝试获取pull对象锁,如果请求锁失败,则说明还有取索引数据动作未结束,如果请求锁成功,就调用SnapPuller对象的fetchLatestIndex方法来取最新的索引数据。
  
void doFetch(SolrParams solrParams) {
  
    String masterUrl = solrParams == null ? null : solrParams.get(MASTER_URL);
  
    if (!snapPullLock.tryLock())
  
      return;
  
    try {
  
      tempSnapPuller = snapPuller;
  
      if (masterUrl != null) {
  
        NamedList<Object> nl = solrParams.toNamedList();
  
        nl.remove(SnapPuller.POLL_INTERVAL);
  
        tempSnapPuller = new SnapPuller(nl, this, core);
  
      }
  
      tempSnapPuller.fetchLatestIndex(core);
  
    } catch (Exception e) {
  
      LOG.error("SnapPull failed ", e);
  
    } finally {
  
      tempSnapPuller = snapPuller;
  
      snapPullLock.unlock();
  
    }
  
}
  
最后真正的取索引数据过程,首先,若mastet节点的indexversion为0,则说明master节点根本没有提供可供复制的索引数据,若master节点和slave节点的indexversion相同,则说明slave节点目前与master节点索引数据状态保持一致,无需同步。若两者的indexversion不同,则开始索引复制过程,首先从master节点上下载指定索引版本号的索引文件列表,然后创建一个索引文件同步服务线程来完成同并工作。
  
这里需要区分的是,如果master节点的年代比slave节点要老,那就说明两者已经不相容,此时slave节点需要新建一个索引目录,再从master节点做一次全量索引复制。还需要注意的一点是,索引同步也是可以同步配置文件的,若配置文件发生变化,则需要对solr核进行一次reload操作。最对了,还有,和文章开头一样, slave节点同步完数据后,别忘了做一次commit操作,以便刷新自己的索引提交点到最新的状态。最后,关闭并等待同步服务线程结束。此外,具体的取索引文件是通过FileFetcher对象来完成。
  
boolean fetchLatestIndex(SolrCore core) throws IOException {
  
    replicationStartTime = System.currentTimeMillis();
  
    try {
  
      //get the current 'replicateable' index version in the master
  
      NamedList response = null;
  
      try {
  
        response = getLatestVersion();
  
      } catch (Exception e) {
  
        LOG.error("Master at: " + masterUrl + " is not available. Index fetch failed. Exception: " + e.getMessage());
  
        return false;
  
      }
  
      long latestVersion = (Long) response.get(CMD_INDEX_VERSION);
  
      long latestGeneration = (Long) response.get(GENERATION);
  
      if (latestVersion == 0L) {
  
        //there is nothing to be replicated
  
        return false;
  
      }
  
      IndexCommit commit;
  
      RefCounted<SolrIndexSearcher> searcherRefCounted = null;
  
      try {
  
        searcherRefCounted = core.getNewestSearcher(false);
  
        commit = searcherRefCounted.get().getReader().getIndexCommit();
  
      } finally {
  
        if (searcherRefCounted != null)
  
          searcherRefCounted.decref();
  
      }
  
      if (commit.getVersion() == latestVersion && commit.getGeneration() == latestGeneration) {
  
        //master and slave are alsready in sync just return
  
        LOG.info("Slave in sync with master.");
  
        return false;
  
      }
  
      LOG.info("Master's version: " + latestVersion + ", generation: " + latestGeneration);
  
      LOG.info("Slave's version: " + commit.getVersion() + ", generation: " + commit.getGeneration());
  
      LOG.info("Starting replication process");
  
      // get the list of files first
  
      fetchFileList(latestVersion);
  
      // this can happen if the commit point is deleted before we fetch the file list.
  
      if(filesToDownload.isEmpty()) return false;
  
      LOG.info("Number of files in latest index in master: " + filesToDownload.size());
  
  
      // Create the sync service
  
      fsyncService = Executors.newSingleThreadExecutor();
  
      // use a synchronized list because the list is read by other threads (to show details)
  
      filesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());
  
      // if the generateion of master is older than that of the slave , it means they are not compatible to be copied
  
      // then a new index direcory to be created and all the files need to be copied
  
      boolean isFullCopyNeeded = commit.getGeneration() >= latestGeneration;
  
      File tmpIndexDir = createTempindexDir(core);
  
      if (isIndexStale())
  
        isFullCopyNeeded = true;
  
      successfulInstall = false;
  
      boolean deleteTmpIdxDir = true;
  
      File indexDir = null ;
  
      try {
  
        indexDir = new File(core.getIndexDir());
  
        downloadIndexFiles(isFullCopyNeeded, tmpIndexDir, latestVersion);
  
        LOG.info("Total time taken for download : " + ((System.currentTimeMillis() - replicationStartTime) / 1000) + " secs");
  
        Collection<Map<String, Object>> modifiedConfFiles = getModifiedConfFiles(confFilesToDownload);
  
        if (!modifiedConfFiles.isEmpty()) {
  
          downloadConfFiles(confFilesToDownload, latestVersion);
  
          if (isFullCopyNeeded) {
  
            successfulInstall = modifyIndexProps(tmpIndexDir.getName());
  
            deleteTmpIdxDir = false;
  
          } else {
  
            successfulInstall = copyIndexFiles(tmpIndexDir, indexDir);
  
          }
  
          if (successfulInstall) {
  
            LOG.info("Configuration files are modified, core will be reloaded");
  
            logReplicationTimeAndConfFiles(modifiedConfFiles, successfulInstall);//write to a file time of replication and conf files.
  
            reloadCore();
  
          }
  
        } else {
  
          terminateAndWaitFsyncService();
  
          if (isFullCopyNeeded) {
  
            successfulInstall = modifyIndexProps(tmpIndexDir.getName());
  
            deleteTmpIdxDir = false;
  
          } else {
  
            successfulInstall = copyIndexFiles(tmpIndexDir, indexDir);
  
          }
  
          if (successfulInstall) {
  
            logReplicationTimeAndConfFiles(modifiedConfFiles, successfulInstall);
  
            doCommit();
  
          }
  
        }
  
        replicationStartTime = 0;
  
        return successfulInstall;
  
      } catch (ReplicationHandlerException e) {
  
        LOG.error("User aborted Replication");
  
      } catch (SolrException e) {
  
        throw e;
  
      } catch (Exception e) {
  
        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Index fetch failed : ", e);
  
      } finally {
  
        if (deleteTmpIdxDir) delTree(tmpIndexDir);
  
        else delTree(indexDir);
  
      }
  
      return successfulInstall;
  
    } finally {
  
      if (!successfulInstall) {
  
        logReplicationTimeAndConfFiles(null, successfulInstall);
  
      }
  
      filesToDownload = filesDownloaded = confFilesDownloaded = confFilesToDownload = null;
  
      replicationStartTime = 0;
  
      fileFetcher = null;
  
      if (fsyncService != null && !fsyncService.isShutdown()) fsyncService.shutdownNow();
  
      fsyncService = null;
  
      stop = false;
  
      fsyncException = null;
  
    }
  
}
[url=][/url]


作者:洞庭散人
出处:http://phinecos.cnblogs.com/ 
http://www.cnblogs.com/phinecos/archive/2012/02/29/2372682.html
深入剖析SolrCloud(四)

该会员没有填写今日想说内容.
您需要登录后才可以回帖 登录 | 立即注册 新浪微博账号登陆

回顶部