首页

通过camel-solr源码包SolrEndpoint依赖apache的CommonsHttpSolrServer实现自定义solr的组件化应用

标签:自定义solr组件,CommonsHttpSolrServer,camel-solr,solrj,component     发布时间:2017-12-26   

一、前言

通过Apachesolr实现包依赖org.apache.solr.client.solrj.impl.CommonsHttpSolrServer(2.9.4)服务拓展使用org.apache.camel.component.solr.SolrEndpoint、org.apache.solr.common.SolrInputDocument.SolrProducer自定义apache solr端应用服务。

二、源码说明

1.SolrEndpoint部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package org.apache.camel.component.solr;
 
import java.util.Map;
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
import org.apache.solr.client.solrj.impl.StreamingUpdateSolrServer;
 
public class SolrEndpoint extends DefaultEndpoint
{
  private CommonsHttpSolrServer solrServer;
  private CommonsHttpSolrServer streamingSolrServer;
  private String requestHandler;
  private int streamingThreadCount;
  private int streamingQueueSize;
 
  public SolrEndpoint(String endpointUri, SolrComponent component, String address, Map<String, Object> parameters)
    throws Exception
  {
    super(endpointUri, component);
 
    this.solrServer = new CommonsHttpSolrServer("http://" + address);
    this.streamingQueueSize = getIntFromString((String)parameters.get("streamingQueueSize"), 10);
    this.streamingThreadCount = getIntFromString((String)parameters.get("streamingThreadCount"), 2);
    this.streamingSolrServer = new StreamingUpdateSolrServer("http://" + address, this.streamingQueueSize, this.streamingThreadCount);
  }
 
  public static int getIntFromString(String value, int defaultValue) {
    if ((value != null) && (value.length() > 0))
      return Integer.parseInt(value);
 
    return defaultValue;
  }
 
  public Producer createProducer() throws Exception
  {
    return new SolrProducer(this);
  }
 
  public Consumer createConsumer(Processor processor) throws Exception
  {
    throw new UnsupportedOperationException("Consumer not supported for Solr endpoint.");
  }
 
  public boolean isSingleton()
  {
    return true;
  }
 
  public CommonsHttpSolrServer getSolrServer() {
    return this.solrServer;
  }
 
  public CommonsHttpSolrServer getStreamingSolrServer() {
    return this.streamingSolrServer;
  }
 
  public void setStreamingSolrServer(CommonsHttpSolrServer streamingSolrServer) {
    this.streamingSolrServer = streamingSolrServer;
  }
 
  public void setMaxRetries(int maxRetries) {
    this.solrServer.setMaxRetries(maxRetries);
    this.streamingSolrServer.setMaxRetries(maxRetries);
  }
 
  public void setSoTimeout(int soTimeout) {
    this.solrServer.setSoTimeout(soTimeout);
    this.streamingSolrServer.setSoTimeout(soTimeout);
  }
 
  public void setConnectionTimeout(int connectionTimeout) {
    this.solrServer.setConnectionTimeout(connectionTimeout);
    this.streamingSolrServer.setConnectionTimeout(connectionTimeout);
  }
 
  public void setDefaultMaxConnectionsPerHost(int defaultMaxConnectionsPerHost) {
    this.solrServer.setDefaultMaxConnectionsPerHost(defaultMaxConnectionsPerHost);
    this.streamingSolrServer.setDefaultMaxConnectionsPerHost(defaultMaxConnectionsPerHost);
  }
 
  public void setMaxTotalConnections(int maxTotalConnections) {
    this.solrServer.setMaxTotalConnections(maxTotalConnections);
    this.streamingSolrServer.setMaxTotalConnections(maxTotalConnections);
  }
 
  public void setFollowRedirects(boolean followRedirects) {
    this.solrServer.setFollowRedirects(followRedirects);
    this.streamingSolrServer.setFollowRedirects(followRedirects);
  }
 
  public void setAllowCompression(boolean allowCompression) {
    this.solrServer.setAllowCompression(allowCompression);
    this.streamingSolrServer.setAllowCompression(allowCompression);
  }
 
  public void setRequestHandler(String requestHandler) {
    this.requestHandler = requestHandler;
  }
 
  public String getRequestHandler() {
    return this.requestHandler;
  }
 
  public int getStreamingThreadCount() {
    return this.streamingThreadCount;
  }
 
  public void setStreamingThreadCount(int streamingThreadCount) {
    this.streamingThreadCount = streamingThreadCount;
  }
 
  public int getStreamingQueueSize() {
    return this.streamingQueueSize;
  }
 
  public void setStreamingQueueSize(int streamingQueueSize) {
    this.streamingQueueSize = streamingQueueSize;
  }
}

2.SolrProducer部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package org.apache.camel.component.solr;
 
import java.io.File;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.WrappedFile;
import org.apache.camel.impl.DefaultProducer;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
import org.apache.solr.client.solrj.request.DirectXmlRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.SolrInputDocument;
 
public class SolrProducer extends DefaultProducer
{
  private SolrServer solrServer;
  private SolrServer streamingSolrServer;
 
  public SolrProducer(SolrEndpoint endpoint)
  {
    super(endpoint);
    this.solrServer = endpoint.getSolrServer();
    this.streamingSolrServer = endpoint.getStreamingSolrServer();
  }
 
  public void process(Exchange exchange)
    throws Exception
  {
    String operation = (String)exchange.getIn().getHeader("SolrOperation");
 
    if (operation == null) {
      throw new IllegalArgumentException("SolrOperation header is missing");
    }
 
    if (operation.equalsIgnoreCase("INSERT"))
      insert(exchange, false);
    else if (operation.equalsIgnoreCase("INSERT_STREAMING"))
      insert(exchange, true);
    else if (operation.equalsIgnoreCase("DELETE_BY_ID"))
      this.solrServer.deleteById((String)exchange.getIn().getBody(String.class));
    else if (operation.equalsIgnoreCase("DELETE_BY_QUERY"))
      this.solrServer.deleteByQuery((String)exchange.getIn().getBody(String.class));
    else if (operation.equalsIgnoreCase("ADD_BEAN"))
      this.solrServer.addBean(exchange.getIn().getBody());
    else if (operation.equalsIgnoreCase("COMMIT"))
      this.solrServer.commit();
    else if (operation.equalsIgnoreCase("ROLLBACK"))
      this.solrServer.rollback();
    else if (operation.equalsIgnoreCase("OPTIMIZE"))
      this.solrServer.optimize();
    else
      throw new IllegalArgumentException("SolrOperation header value '" + operation + "' is not supported");
  }
 
  private void insert(Exchange exchange, boolean isStreaming)
    throws Exception
  {
    Object body = exchange.getIn().getBody();
    if (body instanceof WrappedFile) {
      body = ((WrappedFile)body).getFile();
    }
 
    if (body instanceof File)
    {
      ContentStreamUpdateRequest updateRequest = new ContentStreamUpdateRequest(getRequestHandler());
      updateRequest.addFile((File)body);
 
      for (Map.Entry entry : exchange.getIn().getHeaders().entrySet())
        if (((String)entry.getKey()).startsWith("SolrParam.")) {
          String paramName = ((String)entry.getKey()).substring("SolrParam.".length());
          updateRequest.setParam(paramName, entry.getValue().toString());
        }
 
 
      if (isStreaming)
        updateRequest.process(this.streamingSolrServer);
      else
        updateRequest.process(this.solrServer);
 
    }
    else if (body instanceof SolrInputDocument)
    {
      UpdateRequest updateRequest = new UpdateRequest(getRequestHandler());
      updateRequest.add((SolrInputDocument)body);
 
      if (isStreaming)
        updateRequest.process(this.streamingSolrServer);
      else
        updateRequest.process(this.solrServer);
 
    }
    else
    {
      boolean hasSolrHeaders = false;
      Map headers = exchange.getIn().getHeaders();
      for (Map.Entry entry : exchange.getIn().getHeaders().entrySet())
        if (((String)entry.getKey()).startsWith("SolrField.")) {
          hasSolrHeaders = true;
          break;
        }
 
 
      if (hasSolrHeaders)
      {
        UpdateRequest updateRequest = new UpdateRequest(getRequestHandler());
 
        SolrInputDocument doc = new SolrInputDocument();
        for (Map.Entry entry : exchange.getIn().getHeaders().entrySet())
          if (((String)entry.getKey()).startsWith("SolrField.")) {
            String fieldName = ((String)entry.getKey()).substring("SolrField.".length());
            doc.setField(fieldName, entry.getValue());
          }
 
        updateRequest.add(doc);
 
        if (isStreaming)
          updateRequest.process(this.streamingSolrServer);
        else
          updateRequest.process(this.solrServer);
 
      }
      else if (body instanceof String)
      {
        String bodyAsString = (String)body;
 
        if (!(bodyAsString.startsWith("<add"))) {
          bodyAsString = "<add>" + bodyAsString + "</add>";
        }
 
        DirectXmlRequest xmlRequest = new DirectXmlRequest(getRequestHandler(), bodyAsString);
 
        if (isStreaming)
          this.streamingSolrServer.request(xmlRequest);
        else
          this.solrServer.request(xmlRequest);
      }
      else {
        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "unable to find data in Exchange to update Solr");
      }
    }
  }
 
  private String getRequestHandler() {
    String requestHandler = getEndpoint().getRequestHandler();
    return ((requestHandler == null) ? "/update" : requestHandler);
  }
 
  public SolrEndpoint getEndpoint()
  {
    return ((SolrEndpoint)super.getEndpoint());
  }
}
<<热门下载>>