首页

【数据处理系统】关于本站xwood.net的内容分布式外置发布引擎主要设计分享

标签:系统设计,etl,性能优化,memcached,redis,分布式,多线程,并发,线程同步,缓存     发布时间:2017-08-29   

一、前言

由于本站基于freemarker模板引擎合成生成html页面,一个页面产生需要经过栏目、模板及相关的数据关联转换,最后统一转换发布数据模型PublishEntity(如下),除此之外对于分页及上一页、下一页等页面展示逻辑又需要另外考虑,像这些处理都是基于数据库查询SQL算法获取数据,因此会有n次连接查库I\O处理,那可以想象随着内容增加,发布会越来越慢,实际观察统计8小时最多发布1000篇文章,因此考虑系统长远发展,对强大的外置分布式发布引擎(如下图)的需求变得由为的重要。

【数据处理系统】关于本站xwood.net的内容分布式外置发布引擎主要设计分享

public class PublishEntity implements Serializable {@b@ @b@	private   final long serialVersionUID = 169940596441947331L;@b@@b@	private PublishVo publishVo;// 队列@b@@b@	private Content content;// 内容@b@@b@	private ContentFolder channle;// 栏目@b@@b@	private Templet templet;//模板@b@@b@	private Map<?, ?> map; // 内容对象数据 @b@	..@b@}

二、主要设计考虑

1.多线程处理 - 原来发布安装队列方式依次处理,无疑再等等N*N次查询库IO等待,基于多线程发布,减少多任务连续叠加等待次数

2.减少查库次数,只在提取数据时候查库,其余在引擎初始化阶段把基础数据和关系数据cached到memcached(不要选用redis,redis单线程排队处理,延迟多线程并发时间)

3.分布式发布引擎,后期随着内容无限增长,如需全站重新发布等待时间会超长,因此可以通过增加多台外置发布引擎来有效提速度(提取任务的时候,需要基于redis分布式锁有效控制重复提取数据)

4.单引擎内部解耦,由于发布引擎内容关联数据比较多和模板展示要求比较多变,因此觉得了单个处理引擎处理复杂度还是比较高,需要定义内部处理的不同阶段,清晰的定义各个阶段边界,利于系统后期扩展和代码review

三、主要代码分享

package com.xwood.publisher.core;@b@@b@import java.util.ArrayList;@b@import java.util.List;@b@import java.util.Map;@b@import java.util.Timer;@b@import java.util.TimerTask;@b@import java.util.concurrent.BlockingQueue;@b@import java.util.concurrent.Executors;@b@import java.util.concurrent.LinkedBlockingQueue;@b@import java.util.concurrent.ScheduledExecutorService;@b@@b@import org.apache.commons.lang.StringUtils;@b@import org.apache.log4j.Logger;@b@@b@import com.xwood.publisher.BootProcesserHandler;@b@import com.xwood.publisher.config.ExtConstants;@b@import com.xwood.publisher.core.processunit.S3PEntityRedisProcessUnit;@b@import com.xwood.publisher.core.processunit.SlavePublishEngineProcessUnit;@b@import com.xwood.publisher.master.AbstractProcesserHandler;@b@import com.xwood.publisher.master.ContentFolderProcesserHandler;@b@import com.xwood.publisher.master.FtpServerProcesserHandler;@b@import com.xwood.publisher.master.IDataBridge;@b@import com.xwood.publisher.master.TempletProcesserHandler;@b@import com.xwood.publisher.master.TinContentMapProcesserHandler;@b@import com.xwood.publisher.pojo.PublishEntity;@b@import com.xwood.publisher.util.JedisUtil;@b@import com.xwood.publisher.util.XMemcacheClient;@b@@b@public class MutiProcesserUnitHandler extends IDataBridge implements@b@		BootProcesserHandler {@b@	@b@	private   static  MutiProcesserUnitHandler instance=new MutiProcesserUnitHandler();@b@	@b@	protected static Logger logger = Logger.getLogger(MutiProcesserUnitHandler.class);@b@	@b@	private  Runnable watchworker;@b@	@b@	private  Timer  watcherMgr;@b@	@b@	private  ScheduledExecutorService workerMgr;@b@	@b@	public  static BlockingQueue<Map<String,Object>>  dm_datas_container=new LinkedBlockingQueue<Map<String,Object>>();@b@	@b@	public  static BlockingQueue<PublishEntity>  s1_datas_container=new LinkedBlockingQueue<PublishEntity>();@b@	@b@	private  StringBuffer tofinished_coid=new StringBuffer("");@b@	private  StringBuffer tofinished_puid=new StringBuffer("");@b@	private  StringBuffer tofinished_errid=new StringBuffer("");@b@	@b@	@b@	private  MutiProcesserUnitHandler(){@b@		watchworker=new TimerTask() {  @b@			public void run() {  @b@				@b@				load();@b@				@b@				extract();@b@				@b@				toPublish();@b@				@b@				toFinshed();@b@				@b@				toReceive();@b@				@b@				System.out.println("---------------------------------------------------------------------------------------");@b@				System.out.println("------------------------------ watchworker ---------------------------------------------");@b@				System.out.println("---------------------------------------------------------------------------------------");@b@				@b@	       	}  @b@	  };@b@	  @b@	  watcherMgr= new Timer(); @b@	  @b@	  workerMgr=Executors.newScheduledThreadPool(Integer.parseInt(ExtConstants.getString("muti.core.size")));@b@	  @b@	}@b@	@b@	public static MutiProcesserUnitHandler getExecutor(){@b@    	return instance;@b@    }@b@	@b@	private  void init(){@b@		@b@		List<AbstractProcesserHandler>  initprocessers=new ArrayList<AbstractProcesserHandler>();@b@		initprocessers.add(new FtpServerProcesserHandler());@b@		initprocessers.add(new ContentFolderProcesserHandler());@b@		initprocessers.add(new TempletProcesserHandler());@b@		@b@		boolean isClearJRedisCache=Boolean.parseBoolean(ExtConstants.getString("redis.flush"));@b@		boolean isClearMemCache=Boolean.parseBoolean(ExtConstants.getString("memcached.flush"));@b@		if(isClearMemCache){@b@			XMemcacheClient.flush();@b@		}@b@		if(isClearJRedisCache){@b@			JedisUtil.flush();@b@			for(AbstractProcesserHandler processer:initprocessers){@b@				processer.run();@b@				logger.info(" 【MutiProcesserUnitHandler @ init】 ... "+processer.toString());@b@			}@b@		}@b@	}@b@	@b@	private void load(){@b@		SingleUnitDao.initProcesserDatasIds();@b@		List<Map<String,Object>> dmdatas=SingleUnitDao.getProcesserS0Data();@b@		if(dmdatas!=null&&!dmdatas.isEmpty()){@b@			dm_datas_container.addAll(dmdatas);@b@			logger.info(" 【MutiProcesserUnitHandler @ load】 ... "+dm_datas_container.size());@b@		}@b@	}@b@	@b@	private void  extract(){@b@		@b@		while(dm_datas_container.size()>0){@b@			@b@			Map<String,Object>  srcobj=dm_datas_container.poll();@b@			@b@			S3PEntityRedisProcessUnit  s3handler=new S3PEntityRedisProcessUnit(srcobj,new TinContentMapProcesserHandler());@b@			@b@			logger.info(" 【workerMgr @ s3handler】 ........................... "+s3handler.toString());@b@			@b@//			workerMgr.submit(s3handler);@b@			try {@b@				workerMgr.execute(s3handler);@b@			} catch (Exception e) {@b@				SlavePublishEngineProcessUnit.id_error_container.add(s3handler.getTargetInnerPublishvo().getId());@b@				logger.error("【S3PEntityRedisProcessUnit  searcher  error】 ", e.getCause());@b@				continue;@b@			}@b@			@b@		}@b@		@b@	}@b@	@b@	@b@	private  void  toPublish(){@b@		@b@		while(s1_datas_container.size()>0){@b@			@b@			PublishEntity  s1_pentity=s1_datas_container.poll();@b@			@b@			if(s1_pentity==null)@b@				continue;@b@			@b@			Runnable  s1handler=new SlavePublishEngineProcessUnit(s1_pentity);@b@			workerMgr.execute(s1handler);@b@		}@b@		@b@	}@b@	@b@	private void  toFinshed(){@b@		@b@		while(SlavePublishEngineProcessUnit.sf_ids_container.size()>0){@b@			@b@			String  pconidsk=null;@b@			@b@			if(SlavePublishEngineProcessUnit.sf_ids_container.size()>0){@b@			   pconidsk=SlavePublishEngineProcessUnit.sf_ids_container.poll();@b@			}@b@			@b@			logger.info(" 【workerMgr @ toFinshed】 ..................pconidsk......... "+pconidsk);@b@				@b@			if(tofinished_coid.toString().split(",").length>20){@b@				tofinished_coid.append("''");@b@					  @b@				SingleUnitDao.setStatusContentFinished(tofinished_coid.toString());@b@					  @b@				tofinished_coid=new StringBuffer("");@b@			}@b@				@b@				@b@			if(tofinished_puid.toString().split(",").length>20){@b@				tofinished_puid.append("''");@b@					  @b@				SingleUnitDao.setStatusPublishQueueFinished(tofinished_puid.toString());@b@					  @b@				tofinished_puid=new StringBuffer("");@b@			}@b@				@b@			if(!StringUtils.isEmpty(pconidsk)){@b@					@b@				String[] idsks=pconidsk.split("@");@b@					@b@				if(idsks!=null&&idsks.length>0){@b@					@b@					if(!StringUtils.isEmpty(idsks[0]))@b@						tofinished_puid.append("'").append(idsks[0]).append("'").append(",");@b@						@b@					if(!StringUtils.isEmpty(idsks[1]))@b@						tofinished_coid.append("'").append(idsks[1]).append("'").append(",");@b@				}@b@				@b@			}@b@				@b@		}@b@		@b@		if(SlavePublishEngineProcessUnit.sf_ids_container.size()==0){@b@			@b@			tofinished_coid.append("''");@b@			tofinished_puid.append("''");@b@			@b@			SingleUnitDao.setStatusPublishQueueFinished(tofinished_puid.toString());@b@			SingleUnitDao.setStatusContentFinished(tofinished_coid.toString());@b@			@b@			tofinished_coid=new StringBuffer("");@b@			tofinished_puid=new StringBuffer("");@b@		}@b@		@b@			@b@		@b@		while(SlavePublishEngineProcessUnit.id_error_container.size()>0){@b@			String errorid =null;@b@			@b@			if(SlavePublishEngineProcessUnit.id_error_container.size()>0)@b@			  errorid =SlavePublishEngineProcessUnit.id_error_container.poll();@b@			@b@			if(tofinished_errid.toString().split(",").length>20){@b@				tofinished_errid.append("''");@b@				SingleUnitDao.setStatusErrPublishStatusDb(tofinished_errid.toString());@b@				tofinished_errid=new StringBuffer("");@b@			} @b@			@b@			if(!StringUtils.isEmpty(errorid))@b@				tofinished_errid.append("'").append(errorid).append("'").append(",");@b@		}@b@		@b@		if(SlavePublishEngineProcessUnit.id_error_container.size()==0){@b@			tofinished_errid.append("''");@b@			SingleUnitDao.setStatusErrPublishStatusDb(tofinished_errid.toString());@b@			tofinished_errid=new StringBuffer("");@b@		}@b@		@b@	}@b@	@b@	@b@	private  void toReceive(){@b@		if(dm_datas_container.isEmpty()){@b@			@b@		}@b@	}@b@@b@	@b@	@Override@b@	public void run() {@b@		init();@b@		watcherMgr.schedule((TimerTask)watchworker, 10,Integer.parseInt(ExtConstants.getString("muti.timer.delay4f")));// 设定指定@b@@b@	}@b@	@b@@b@	/**@b@	 * @param 入口@b@	 */@b@	public static void main(String[] args) {@b@		@b@		MutiProcesserUnitHandler.getExecutor().run();@b@		@b@		Runtime.getRuntime().addShutdownHook(new Thread() {  @b@            public void run() {  @b@            	@b@	         try{@b@	        	  logger.info(" 【系统关闭】 准备进行hook处理 ...。.../n/n/n/n  ");@b@				  @b@				  JedisUtil.flush();@b@				  @b@				  logger.info(" 【系统关闭】 系统成功SHUTDOWN ....../n/n/n/n  ");@b@				  @b@				} catch (Exception e) {@b@						e.printStackTrace();@b@				}@b@            @b@            }  @b@        });@b@@b@	}@b@@b@}

四、优化后性能说明

在部署单个外部引擎的情况下,发布1000条内容控制1分钟之内(在发布期间CPU持续在100%的利用率如下图1,在发布完成后,cpu资源又能恢复正常下图2),性能提供了48000%

图1

【数据处理系统】关于本站xwood.net的内容分布式外置发布引擎主要设计分享

图2

【数据处理系统】关于本站xwood.net的内容分布式外置发布引擎主要设计分享