一、前言
通过Apache的solr实现包依赖org.apache.solr.client.solrj.impl.CommonsHttpSolrServer(2.9.4)服务拓展使用org.apache.camel.component.solr.SolrEndpoint、org.apache.solr.common.SolrInputDocument.SolrProducer自定义apache solr端应用服务。
二、源码说明
1.SolrEndpoint部分
package org.apache.camel.component.solr;@b@@b@import java.util.Map;@b@import org.apache.camel.Consumer;@b@import org.apache.camel.Processor;@b@import org.apache.camel.Producer;@b@import org.apache.camel.impl.DefaultEndpoint;@b@import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;@b@import org.apache.solr.client.solrj.impl.StreamingUpdateSolrServer;@b@@b@public class SolrEndpoint extends DefaultEndpoint@b@{@b@ private CommonsHttpSolrServer solrServer;@b@ private CommonsHttpSolrServer streamingSolrServer;@b@ private String requestHandler;@b@ private int streamingThreadCount;@b@ private int streamingQueueSize;@b@@b@ public SolrEndpoint(String endpointUri, SolrComponent component, String address, Map<String, Object> parameters)@b@ throws Exception@b@ {@b@ super(endpointUri, component);@b@@b@ this.solrServer = new CommonsHttpSolrServer("http://" + address);@b@ this.streamingQueueSize = getIntFromString((String)parameters.get("streamingQueueSize"), 10);@b@ this.streamingThreadCount = getIntFromString((String)parameters.get("streamingThreadCount"), 2);@b@ this.streamingSolrServer = new StreamingUpdateSolrServer("http://" + address, this.streamingQueueSize, this.streamingThreadCount);@b@ }@b@@b@ public static int getIntFromString(String value, int defaultValue) {@b@ if ((value != null) && (value.length() > 0))@b@ return Integer.parseInt(value);@b@@b@ return defaultValue;@b@ }@b@@b@ public Producer createProducer() throws Exception@b@ {@b@ return new SolrProducer(this);@b@ }@b@@b@ public Consumer createConsumer(Processor processor) throws Exception@b@ {@b@ throw new UnsupportedOperationException("Consumer not supported for Solr endpoint.");@b@ }@b@@b@ public boolean isSingleton()@b@ {@b@ return true;@b@ }@b@@b@ public CommonsHttpSolrServer getSolrServer() {@b@ return this.solrServer;@b@ }@b@@b@ public CommonsHttpSolrServer getStreamingSolrServer() {@b@ return this.streamingSolrServer;@b@ }@b@@b@ public void setStreamingSolrServer(CommonsHttpSolrServer streamingSolrServer) {@b@ this.streamingSolrServer = streamingSolrServer;@b@ }@b@@b@ public void setMaxRetries(int maxRetries) {@b@ this.solrServer.setMaxRetries(maxRetries);@b@ this.streamingSolrServer.setMaxRetries(maxRetries);@b@ }@b@@b@ public void setSoTimeout(int soTimeout) {@b@ this.solrServer.setSoTimeout(soTimeout);@b@ this.streamingSolrServer.setSoTimeout(soTimeout);@b@ }@b@@b@ public void setConnectionTimeout(int connectionTimeout) {@b@ this.solrServer.setConnectionTimeout(connectionTimeout);@b@ this.streamingSolrServer.setConnectionTimeout(connectionTimeout);@b@ }@b@@b@ public void setDefaultMaxConnectionsPerHost(int defaultMaxConnectionsPerHost) {@b@ this.solrServer.setDefaultMaxConnectionsPerHost(defaultMaxConnectionsPerHost);@b@ this.streamingSolrServer.setDefaultMaxConnectionsPerHost(defaultMaxConnectionsPerHost);@b@ }@b@@b@ public void setMaxTotalConnections(int maxTotalConnections) {@b@ this.solrServer.setMaxTotalConnections(maxTotalConnections);@b@ this.streamingSolrServer.setMaxTotalConnections(maxTotalConnections);@b@ }@b@@b@ public void setFollowRedirects(boolean followRedirects) {@b@ this.solrServer.setFollowRedirects(followRedirects);@b@ this.streamingSolrServer.setFollowRedirects(followRedirects);@b@ }@b@@b@ public void setAllowCompression(boolean allowCompression) {@b@ this.solrServer.setAllowCompression(allowCompression);@b@ this.streamingSolrServer.setAllowCompression(allowCompression);@b@ }@b@@b@ public void setRequestHandler(String requestHandler) {@b@ this.requestHandler = requestHandler;@b@ }@b@@b@ public String getRequestHandler() {@b@ return this.requestHandler;@b@ }@b@@b@ public int getStreamingThreadCount() {@b@ return this.streamingThreadCount;@b@ }@b@@b@ public void setStreamingThreadCount(int streamingThreadCount) {@b@ this.streamingThreadCount = streamingThreadCount;@b@ }@b@@b@ public int getStreamingQueueSize() {@b@ return this.streamingQueueSize;@b@ }@b@@b@ public void setStreamingQueueSize(int streamingQueueSize) {@b@ this.streamingQueueSize = streamingQueueSize;@b@ }@b@}
2.SolrProducer部分
package org.apache.camel.component.solr;@b@@b@import java.io.File;@b@import java.util.Map;@b@import java.util.Map.Entry;@b@import org.apache.camel.Exchange;@b@import org.apache.camel.Message;@b@import org.apache.camel.WrappedFile;@b@import org.apache.camel.impl.DefaultProducer;@b@import org.apache.solr.client.solrj.SolrServer;@b@import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;@b@import org.apache.solr.client.solrj.request.DirectXmlRequest;@b@import org.apache.solr.client.solrj.request.UpdateRequest;@b@import org.apache.solr.common.SolrException;@b@import org.apache.solr.common.SolrException.ErrorCode;@b@import org.apache.solr.common.SolrInputDocument;@b@@b@public class SolrProducer extends DefaultProducer@b@{@b@ private SolrServer solrServer;@b@ private SolrServer streamingSolrServer;@b@@b@ public SolrProducer(SolrEndpoint endpoint)@b@ {@b@ super(endpoint);@b@ this.solrServer = endpoint.getSolrServer();@b@ this.streamingSolrServer = endpoint.getStreamingSolrServer();@b@ }@b@@b@ public void process(Exchange exchange)@b@ throws Exception@b@ {@b@ String operation = (String)exchange.getIn().getHeader("SolrOperation");@b@@b@ if (operation == null) {@b@ throw new IllegalArgumentException("SolrOperation header is missing");@b@ }@b@@b@ if (operation.equalsIgnoreCase("INSERT"))@b@ insert(exchange, false);@b@ else if (operation.equalsIgnoreCase("INSERT_STREAMING"))@b@ insert(exchange, true);@b@ else if (operation.equalsIgnoreCase("DELETE_BY_ID"))@b@ this.solrServer.deleteById((String)exchange.getIn().getBody(String.class));@b@ else if (operation.equalsIgnoreCase("DELETE_BY_QUERY"))@b@ this.solrServer.deleteByQuery((String)exchange.getIn().getBody(String.class));@b@ else if (operation.equalsIgnoreCase("ADD_BEAN"))@b@ this.solrServer.addBean(exchange.getIn().getBody());@b@ else if (operation.equalsIgnoreCase("COMMIT"))@b@ this.solrServer.commit();@b@ else if (operation.equalsIgnoreCase("ROLLBACK"))@b@ this.solrServer.rollback();@b@ else if (operation.equalsIgnoreCase("OPTIMIZE"))@b@ this.solrServer.optimize();@b@ else@b@ throw new IllegalArgumentException("SolrOperation header value '" + operation + "' is not supported");@b@ }@b@@b@ private void insert(Exchange exchange, boolean isStreaming)@b@ throws Exception@b@ {@b@ Object body = exchange.getIn().getBody();@b@ if (body instanceof WrappedFile) {@b@ body = ((WrappedFile)body).getFile();@b@ }@b@@b@ if (body instanceof File)@b@ {@b@ ContentStreamUpdateRequest updateRequest = new ContentStreamUpdateRequest(getRequestHandler());@b@ updateRequest.addFile((File)body);@b@@b@ for (Map.Entry entry : exchange.getIn().getHeaders().entrySet())@b@ if (((String)entry.getKey()).startsWith("SolrParam.")) {@b@ String paramName = ((String)entry.getKey()).substring("SolrParam.".length());@b@ updateRequest.setParam(paramName, entry.getValue().toString());@b@ }@b@@b@@b@ if (isStreaming)@b@ updateRequest.process(this.streamingSolrServer);@b@ else@b@ updateRequest.process(this.solrServer);@b@@b@ }@b@ else if (body instanceof SolrInputDocument)@b@ {@b@ UpdateRequest updateRequest = new UpdateRequest(getRequestHandler());@b@ updateRequest.add((SolrInputDocument)body);@b@@b@ if (isStreaming)@b@ updateRequest.process(this.streamingSolrServer);@b@ else@b@ updateRequest.process(this.solrServer);@b@@b@ }@b@ else@b@ {@b@ boolean hasSolrHeaders = false;@b@ Map headers = exchange.getIn().getHeaders();@b@ for (Map.Entry entry : exchange.getIn().getHeaders().entrySet())@b@ if (((String)entry.getKey()).startsWith("SolrField.")) {@b@ hasSolrHeaders = true;@b@ break;@b@ }@b@@b@@b@ if (hasSolrHeaders)@b@ {@b@ UpdateRequest updateRequest = new UpdateRequest(getRequestHandler());@b@@b@ SolrInputDocument doc = new SolrInputDocument();@b@ for (Map.Entry entry : exchange.getIn().getHeaders().entrySet())@b@ if (((String)entry.getKey()).startsWith("SolrField.")) {@b@ String fieldName = ((String)entry.getKey()).substring("SolrField.".length());@b@ doc.setField(fieldName, entry.getValue());@b@ }@b@@b@ updateRequest.add(doc);@b@@b@ if (isStreaming)@b@ updateRequest.process(this.streamingSolrServer);@b@ else@b@ updateRequest.process(this.solrServer);@b@@b@ }@b@ else if (body instanceof String)@b@ {@b@ String bodyAsString = (String)body;@b@@b@ if (!(bodyAsString.startsWith("<add"))) {@b@ bodyAsString = "<add>" + bodyAsString + "</add>";@b@ }@b@@b@ DirectXmlRequest xmlRequest = new DirectXmlRequest(getRequestHandler(), bodyAsString);@b@@b@ if (isStreaming)@b@ this.streamingSolrServer.request(xmlRequest);@b@ else@b@ this.solrServer.request(xmlRequest);@b@ }@b@ else {@b@ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "unable to find data in Exchange to update Solr");@b@ }@b@ }@b@ }@b@@b@ private String getRequestHandler() {@b@ String requestHandler = getEndpoint().getRequestHandler();@b@ return ((requestHandler == null) ? "/update" : requestHandler);@b@ }@b@@b@ public SolrEndpoint getEndpoint()@b@ {@b@ return ((SolrEndpoint)super.getEndpoint());@b@ }@b@}