一、前言
由于本站基于freemarker模板引擎合成生成html页面,一个页面产生需要经过栏目、模板及相关的数据关联转换,最后统一转换发布数据模型PublishEntity(如下),除此之外对于分页及上一页、下一页等页面展示逻辑又需要另外考虑,像这些处理都是基于数据库查询SQL算法获取数据,因此会有n次连接查库I\O处理,那可以想象随着内容增加,发布会越来越慢,实际观察统计8小时最多发布1000篇文章,因此考虑系统长远发展,对强大的外置分布式发布引擎(如下图)的需求变得由为的重要。
public class PublishEntity implements Serializable {@b@ @b@ private final long serialVersionUID = 169940596441947331L;@b@@b@ private PublishVo publishVo;// 队列@b@@b@ private Content content;// 内容@b@@b@ private ContentFolder channle;// 栏目@b@@b@ private Templet templet;//模板@b@@b@ private Map<?, ?> map; // 内容对象数据 @b@ ..@b@}
二、主要设计考虑
1.多线程处理 - 原来发布安装队列方式依次处理,无疑再等等N*N次查询库IO等待,基于多线程发布,减少多任务连续叠加等待次数
2.减少查库次数,只在提取数据时候查库,其余在引擎初始化阶段把基础数据和关系数据cached到memcached(不要选用redis,redis单线程排队处理,延迟多线程并发时间)
3.分布式发布引擎,后期随着内容无限增长,如需全站重新发布等待时间会超长,因此可以通过增加多台外置发布引擎来有效提速度(提取任务的时候,需要基于redis分布式锁有效控制重复提取数据)
4.单引擎内部解耦,由于发布引擎内容关联数据比较多和模板展示要求比较多变,因此觉得了单个处理引擎处理复杂度还是比较高,需要定义内部处理的不同阶段,清晰的定义各个阶段边界,利于系统后期扩展和代码review
三、主要代码分享
package com.xwood.publisher.core;@b@@b@import java.util.ArrayList;@b@import java.util.List;@b@import java.util.Map;@b@import java.util.Timer;@b@import java.util.TimerTask;@b@import java.util.concurrent.BlockingQueue;@b@import java.util.concurrent.Executors;@b@import java.util.concurrent.LinkedBlockingQueue;@b@import java.util.concurrent.ScheduledExecutorService;@b@@b@import org.apache.commons.lang.StringUtils;@b@import org.apache.log4j.Logger;@b@@b@import com.xwood.publisher.BootProcesserHandler;@b@import com.xwood.publisher.config.ExtConstants;@b@import com.xwood.publisher.core.processunit.S3PEntityRedisProcessUnit;@b@import com.xwood.publisher.core.processunit.SlavePublishEngineProcessUnit;@b@import com.xwood.publisher.master.AbstractProcesserHandler;@b@import com.xwood.publisher.master.ContentFolderProcesserHandler;@b@import com.xwood.publisher.master.FtpServerProcesserHandler;@b@import com.xwood.publisher.master.IDataBridge;@b@import com.xwood.publisher.master.TempletProcesserHandler;@b@import com.xwood.publisher.master.TinContentMapProcesserHandler;@b@import com.xwood.publisher.pojo.PublishEntity;@b@import com.xwood.publisher.util.JedisUtil;@b@import com.xwood.publisher.util.XMemcacheClient;@b@@b@public class MutiProcesserUnitHandler extends IDataBridge implements@b@ BootProcesserHandler {@b@ @b@ private static MutiProcesserUnitHandler instance=new MutiProcesserUnitHandler();@b@ @b@ protected static Logger logger = Logger.getLogger(MutiProcesserUnitHandler.class);@b@ @b@ private Runnable watchworker;@b@ @b@ private Timer watcherMgr;@b@ @b@ private ScheduledExecutorService workerMgr;@b@ @b@ public static BlockingQueue<Map<String,Object>> dm_datas_container=new LinkedBlockingQueue<Map<String,Object>>();@b@ @b@ public static BlockingQueue<PublishEntity> s1_datas_container=new LinkedBlockingQueue<PublishEntity>();@b@ @b@ private StringBuffer tofinished_coid=new StringBuffer("");@b@ private StringBuffer tofinished_puid=new StringBuffer("");@b@ private StringBuffer tofinished_errid=new StringBuffer("");@b@ @b@ @b@ private MutiProcesserUnitHandler(){@b@ watchworker=new TimerTask() { @b@ public void run() { @b@ @b@ load();@b@ @b@ extract();@b@ @b@ toPublish();@b@ @b@ toFinshed();@b@ @b@ toReceive();@b@ @b@ System.out.println("---------------------------------------------------------------------------------------");@b@ System.out.println("------------------------------ watchworker ---------------------------------------------");@b@ System.out.println("---------------------------------------------------------------------------------------");@b@ @b@ } @b@ };@b@ @b@ watcherMgr= new Timer(); @b@ @b@ workerMgr=Executors.newScheduledThreadPool(Integer.parseInt(ExtConstants.getString("muti.core.size")));@b@ @b@ }@b@ @b@ public static MutiProcesserUnitHandler getExecutor(){@b@ return instance;@b@ }@b@ @b@ private void init(){@b@ @b@ List<AbstractProcesserHandler> initprocessers=new ArrayList<AbstractProcesserHandler>();@b@ initprocessers.add(new FtpServerProcesserHandler());@b@ initprocessers.add(new ContentFolderProcesserHandler());@b@ initprocessers.add(new TempletProcesserHandler());@b@ @b@ boolean isClearJRedisCache=Boolean.parseBoolean(ExtConstants.getString("redis.flush"));@b@ boolean isClearMemCache=Boolean.parseBoolean(ExtConstants.getString("memcached.flush"));@b@ if(isClearMemCache){@b@ XMemcacheClient.flush();@b@ }@b@ if(isClearJRedisCache){@b@ JedisUtil.flush();@b@ for(AbstractProcesserHandler processer:initprocessers){@b@ processer.run();@b@ logger.info(" 【MutiProcesserUnitHandler @ init】 ... "+processer.toString());@b@ }@b@ }@b@ }@b@ @b@ private void load(){@b@ SingleUnitDao.initProcesserDatasIds();@b@ List<Map<String,Object>> dmdatas=SingleUnitDao.getProcesserS0Data();@b@ if(dmdatas!=null&&!dmdatas.isEmpty()){@b@ dm_datas_container.addAll(dmdatas);@b@ logger.info(" 【MutiProcesserUnitHandler @ load】 ... "+dm_datas_container.size());@b@ }@b@ }@b@ @b@ private void extract(){@b@ @b@ while(dm_datas_container.size()>0){@b@ @b@ Map<String,Object> srcobj=dm_datas_container.poll();@b@ @b@ S3PEntityRedisProcessUnit s3handler=new S3PEntityRedisProcessUnit(srcobj,new TinContentMapProcesserHandler());@b@ @b@ logger.info(" 【workerMgr @ s3handler】 ........................... "+s3handler.toString());@b@ @b@// workerMgr.submit(s3handler);@b@ try {@b@ workerMgr.execute(s3handler);@b@ } catch (Exception e) {@b@ SlavePublishEngineProcessUnit.id_error_container.add(s3handler.getTargetInnerPublishvo().getId());@b@ logger.error("【S3PEntityRedisProcessUnit searcher error】 ", e.getCause());@b@ continue;@b@ }@b@ @b@ }@b@ @b@ }@b@ @b@ @b@ private void toPublish(){@b@ @b@ while(s1_datas_container.size()>0){@b@ @b@ PublishEntity s1_pentity=s1_datas_container.poll();@b@ @b@ if(s1_pentity==null)@b@ continue;@b@ @b@ Runnable s1handler=new SlavePublishEngineProcessUnit(s1_pentity);@b@ workerMgr.execute(s1handler);@b@ }@b@ @b@ }@b@ @b@ private void toFinshed(){@b@ @b@ while(SlavePublishEngineProcessUnit.sf_ids_container.size()>0){@b@ @b@ String pconidsk=null;@b@ @b@ if(SlavePublishEngineProcessUnit.sf_ids_container.size()>0){@b@ pconidsk=SlavePublishEngineProcessUnit.sf_ids_container.poll();@b@ }@b@ @b@ logger.info(" 【workerMgr @ toFinshed】 ..................pconidsk......... "+pconidsk);@b@ @b@ if(tofinished_coid.toString().split(",").length>20){@b@ tofinished_coid.append("''");@b@ @b@ SingleUnitDao.setStatusContentFinished(tofinished_coid.toString());@b@ @b@ tofinished_coid=new StringBuffer("");@b@ }@b@ @b@ @b@ if(tofinished_puid.toString().split(",").length>20){@b@ tofinished_puid.append("''");@b@ @b@ SingleUnitDao.setStatusPublishQueueFinished(tofinished_puid.toString());@b@ @b@ tofinished_puid=new StringBuffer("");@b@ }@b@ @b@ if(!StringUtils.isEmpty(pconidsk)){@b@ @b@ String[] idsks=pconidsk.split("@");@b@ @b@ if(idsks!=null&&idsks.length>0){@b@ @b@ if(!StringUtils.isEmpty(idsks[0]))@b@ tofinished_puid.append("'").append(idsks[0]).append("'").append(",");@b@ @b@ if(!StringUtils.isEmpty(idsks[1]))@b@ tofinished_coid.append("'").append(idsks[1]).append("'").append(",");@b@ }@b@ @b@ }@b@ @b@ }@b@ @b@ if(SlavePublishEngineProcessUnit.sf_ids_container.size()==0){@b@ @b@ tofinished_coid.append("''");@b@ tofinished_puid.append("''");@b@ @b@ SingleUnitDao.setStatusPublishQueueFinished(tofinished_puid.toString());@b@ SingleUnitDao.setStatusContentFinished(tofinished_coid.toString());@b@ @b@ tofinished_coid=new StringBuffer("");@b@ tofinished_puid=new StringBuffer("");@b@ }@b@ @b@ @b@ @b@ while(SlavePublishEngineProcessUnit.id_error_container.size()>0){@b@ String errorid =null;@b@ @b@ if(SlavePublishEngineProcessUnit.id_error_container.size()>0)@b@ errorid =SlavePublishEngineProcessUnit.id_error_container.poll();@b@ @b@ if(tofinished_errid.toString().split(",").length>20){@b@ tofinished_errid.append("''");@b@ SingleUnitDao.setStatusErrPublishStatusDb(tofinished_errid.toString());@b@ tofinished_errid=new StringBuffer("");@b@ } @b@ @b@ if(!StringUtils.isEmpty(errorid))@b@ tofinished_errid.append("'").append(errorid).append("'").append(",");@b@ }@b@ @b@ if(SlavePublishEngineProcessUnit.id_error_container.size()==0){@b@ tofinished_errid.append("''");@b@ SingleUnitDao.setStatusErrPublishStatusDb(tofinished_errid.toString());@b@ tofinished_errid=new StringBuffer("");@b@ }@b@ @b@ }@b@ @b@ @b@ private void toReceive(){@b@ if(dm_datas_container.isEmpty()){@b@ @b@ }@b@ }@b@@b@ @b@ @Override@b@ public void run() {@b@ init();@b@ watcherMgr.schedule((TimerTask)watchworker, 10,Integer.parseInt(ExtConstants.getString("muti.timer.delay4f")));// 设定指定@b@@b@ }@b@ @b@@b@ /**@b@ * @param 入口@b@ */@b@ public static void main(String[] args) {@b@ @b@ MutiProcesserUnitHandler.getExecutor().run();@b@ @b@ Runtime.getRuntime().addShutdownHook(new Thread() { @b@ public void run() { @b@ @b@ try{@b@ logger.info(" 【系统关闭】 准备进行hook处理 ...。.../n/n/n/n ");@b@ @b@ JedisUtil.flush();@b@ @b@ logger.info(" 【系统关闭】 系统成功SHUTDOWN ....../n/n/n/n ");@b@ @b@ } catch (Exception e) {@b@ e.printStackTrace();@b@ }@b@ @b@ } @b@ });@b@@b@ }@b@@b@}
四、优化后性能说明
在部署单个外部引擎的情况下,发布1000条内容控制1分钟之内(在发布期间CPU持续在100%的利用率如下图1,在发布完成后,cpu资源又能恢复正常下图2),性能提供了48000%
图1
图2