一、前言
通过Apache的solr实现包依赖org.apache.solr.client.solrj.impl.CommonsHttpSolrServer(2.9.4)服务拓展使用org.apache.camel.component.solr.SolrEndpoint、org.apache.solr.common.SolrInputDocument.SolrProducer自定义apache solr端应用服务。
二、源码说明
1.SolrEndpoint部分
![]() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 | package org.apache.camel.component.solr; import java.util.Map; import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.impl.DefaultEndpoint; import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer; import org.apache.solr.client.solrj.impl.StreamingUpdateSolrServer; public class SolrEndpoint extends DefaultEndpoint { private CommonsHttpSolrServer solrServer; private CommonsHttpSolrServer streamingSolrServer; private String requestHandler; private int streamingThreadCount; private int streamingQueueSize; public SolrEndpoint(String endpointUri, SolrComponent component, String address, Map<String, Object> parameters) throws Exception { super (endpointUri, component); this .solrServer = new CommonsHttpSolrServer( "http://" + address); this .streamingQueueSize = getIntFromString((String)parameters.get( "streamingQueueSize" ), 10 ); this .streamingThreadCount = getIntFromString((String)parameters.get( "streamingThreadCount" ), 2 ); this .streamingSolrServer = new StreamingUpdateSolrServer( "http://" + address, this .streamingQueueSize, this .streamingThreadCount); } public static int getIntFromString(String value, int defaultValue) { if ((value != null ) && (value.length() > 0 )) return Integer.parseInt(value); return defaultValue; } public Producer createProducer() throws Exception { return new SolrProducer( this ); } public Consumer createConsumer(Processor processor) throws Exception { throw new UnsupportedOperationException( "Consumer not supported for Solr endpoint." ); } public boolean isSingleton() { return true ; } public CommonsHttpSolrServer getSolrServer() { return this .solrServer; } public CommonsHttpSolrServer getStreamingSolrServer() { return this .streamingSolrServer; } public void setStreamingSolrServer(CommonsHttpSolrServer streamingSolrServer) { this .streamingSolrServer = streamingSolrServer; } public void setMaxRetries( int maxRetries) { this .solrServer.setMaxRetries(maxRetries); this .streamingSolrServer.setMaxRetries(maxRetries); } public void setSoTimeout( int soTimeout) { this .solrServer.setSoTimeout(soTimeout); this .streamingSolrServer.setSoTimeout(soTimeout); } public void setConnectionTimeout( int connectionTimeout) { this .solrServer.setConnectionTimeout(connectionTimeout); this .streamingSolrServer.setConnectionTimeout(connectionTimeout); } public void setDefaultMaxConnectionsPerHost( int defaultMaxConnectionsPerHost) { this .solrServer.setDefaultMaxConnectionsPerHost(defaultMaxConnectionsPerHost); this .streamingSolrServer.setDefaultMaxConnectionsPerHost(defaultMaxConnectionsPerHost); } public void setMaxTotalConnections( int maxTotalConnections) { this .solrServer.setMaxTotalConnections(maxTotalConnections); this .streamingSolrServer.setMaxTotalConnections(maxTotalConnections); } public void setFollowRedirects( boolean followRedirects) { this .solrServer.setFollowRedirects(followRedirects); this .streamingSolrServer.setFollowRedirects(followRedirects); } public void setAllowCompression( boolean allowCompression) { this .solrServer.setAllowCompression(allowCompression); this .streamingSolrServer.setAllowCompression(allowCompression); } public void setRequestHandler(String requestHandler) { this .requestHandler = requestHandler; } public String getRequestHandler() { return this .requestHandler; } public int getStreamingThreadCount() { return this .streamingThreadCount; } public void setStreamingThreadCount( int streamingThreadCount) { this .streamingThreadCount = streamingThreadCount; } public int getStreamingQueueSize() { return this .streamingQueueSize; } public void setStreamingQueueSize( int streamingQueueSize) { this .streamingQueueSize = streamingQueueSize; } } |
2.SolrProducer部分
![]() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 | package org.apache.camel.component.solr; import java.io.File; import java.util.Map; import java.util.Map.Entry; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.WrappedFile; import org.apache.camel.impl.DefaultProducer; import org.apache.solr.client.solrj.SolrServer; import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest; import org.apache.solr.client.solrj.request.DirectXmlRequest; import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.SolrInputDocument; public class SolrProducer extends DefaultProducer { private SolrServer solrServer; private SolrServer streamingSolrServer; public SolrProducer(SolrEndpoint endpoint) { super (endpoint); this .solrServer = endpoint.getSolrServer(); this .streamingSolrServer = endpoint.getStreamingSolrServer(); } public void process(Exchange exchange) throws Exception { String operation = (String)exchange.getIn().getHeader( "SolrOperation" ); if (operation == null ) { throw new IllegalArgumentException( "SolrOperation header is missing" ); } if (operation.equalsIgnoreCase( "INSERT" )) insert(exchange, false ); else if (operation.equalsIgnoreCase( "INSERT_STREAMING" )) insert(exchange, true ); else if (operation.equalsIgnoreCase( "DELETE_BY_ID" )) this .solrServer.deleteById((String)exchange.getIn().getBody(String. class )); else if (operation.equalsIgnoreCase( "DELETE_BY_QUERY" )) this .solrServer.deleteByQuery((String)exchange.getIn().getBody(String. class )); else if (operation.equalsIgnoreCase( "ADD_BEAN" )) this .solrServer.addBean(exchange.getIn().getBody()); else if (operation.equalsIgnoreCase( "COMMIT" )) this .solrServer.commit(); else if (operation.equalsIgnoreCase( "ROLLBACK" )) this .solrServer.rollback(); else if (operation.equalsIgnoreCase( "OPTIMIZE" )) this .solrServer.optimize(); else throw new IllegalArgumentException( "SolrOperation header value '" + operation + "' is not supported" ); } private void insert(Exchange exchange, boolean isStreaming) throws Exception { Object body = exchange.getIn().getBody(); if (body instanceof WrappedFile) { body = ((WrappedFile)body).getFile(); } if (body instanceof File) { ContentStreamUpdateRequest updateRequest = new ContentStreamUpdateRequest(getRequestHandler()); updateRequest.addFile((File)body); for (Map.Entry entry : exchange.getIn().getHeaders().entrySet()) if (((String)entry.getKey()).startsWith( "SolrParam." )) { String paramName = ((String)entry.getKey()).substring( "SolrParam." .length()); updateRequest.setParam(paramName, entry.getValue().toString()); } if (isStreaming) updateRequest.process( this .streamingSolrServer); else updateRequest.process( this .solrServer); } else if (body instanceof SolrInputDocument) { UpdateRequest updateRequest = new UpdateRequest(getRequestHandler()); updateRequest.add((SolrInputDocument)body); if (isStreaming) updateRequest.process( this .streamingSolrServer); else updateRequest.process( this .solrServer); } else { boolean hasSolrHeaders = false ; Map headers = exchange.getIn().getHeaders(); for (Map.Entry entry : exchange.getIn().getHeaders().entrySet()) if (((String)entry.getKey()).startsWith( "SolrField." )) { hasSolrHeaders = true ; break ; } if (hasSolrHeaders) { UpdateRequest updateRequest = new UpdateRequest(getRequestHandler()); SolrInputDocument doc = new SolrInputDocument(); for (Map.Entry entry : exchange.getIn().getHeaders().entrySet()) if (((String)entry.getKey()).startsWith( "SolrField." )) { String fieldName = ((String)entry.getKey()).substring( "SolrField." .length()); doc.setField(fieldName, entry.getValue()); } updateRequest.add(doc); if (isStreaming) updateRequest.process( this .streamingSolrServer); else updateRequest.process( this .solrServer); } else if (body instanceof String) { String bodyAsString = (String)body; if (!(bodyAsString.startsWith( "<add" ))) { bodyAsString = "<add>" + bodyAsString + "</add>" ; } DirectXmlRequest xmlRequest = new DirectXmlRequest(getRequestHandler(), bodyAsString); if (isStreaming) this .streamingSolrServer.request(xmlRequest); else this .solrServer.request(xmlRequest); } else { throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "unable to find data in Exchange to update Solr" ); } } } private String getRequestHandler() { String requestHandler = getEndpoint().getRequestHandler(); return ((requestHandler == null ) ? "/update" : requestHandler); } public SolrEndpoint getEndpoint() { return ((SolrEndpoint) super .getEndpoint()); } } |