client-cpp  0.0.1-SNAPSHOT
AbstractHttpChannel.hpp
Go to the documentation of this file.
1 /*
2  * Copyright 2014 CyberVision, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef ABSTRACTHTTPCHANNEL_HPP_
18 #define ABSTRACTHTTPCHANNEL_HPP_
19 
20 
23 #include "kaa/http/HttpClient.hpp"
24 
26 #include "kaa/logging/Log.hpp"
32 #include <boost/bind.hpp>
33 #include <boost/cstdint.hpp>
34 #include <boost/thread/mutex.hpp>
38 
39 namespace kaa {
40 
41 template <ChannelType Type>
43 public:
44  AbstractHttpChannel(IKaaChannelManager *channelManager, const KeyPair& clientKeys);
45  virtual ~AbstractHttpChannel() { }
46 
47  virtual void sync(TransportType type);
48  virtual void syncAll();
49  virtual void syncAck(TransportType type);
50  virtual void setMultiplexer(IKaaDataMultiplexer *multiplexer);
51  virtual void setDemultiplexer(IKaaDataDemultiplexer *demultiplexer);
52  virtual void setServer(IServerInfoPtr server);
53 
54  virtual ChannelType getChannelType() const { return Type; }
55 
57 
58 protected:
59  typedef boost::shared_ptr<AbstractServerInfo<Type> > AbstractServerInfoPtr;
60 
61  HttpDataProcessor* getHttpDataProcessor() { return &httpDataProcessor_; }
62  virtual void processTypes(const std::map<TransportType, ChannelDirection>& types, boost::unique_lock<boost::mutex>& lock);
63 
64 private:
65  virtual boost::shared_ptr<IHttpRequest> createRequest(AbstractServerInfoPtr server, const std::vector<boost::uint8_t>& body) = 0;
66  virtual std::string retrieveResponse(const IHttpResponse& response) = 0;
67 
68 private:
69  KeyPair clientKeys_;
70 
71  bool lastConnectionFailed_;
72 
73  IKaaDataMultiplexer *multiplexer_;
74  IKaaDataDemultiplexer *demultiplexer_;
75  IKaaChannelManager *channelManager_;
76  AbstractServerInfoPtr currentServer_;
77  HttpDataProcessor httpDataProcessor_;
78  HttpClient httpClient_;
79  boost::mutex channelGuard_;
80 };
81 
82 template <ChannelType Type>
84  : clientKeys_(clientKeys), lastConnectionFailed_(false)
85  , multiplexer_(nullptr), demultiplexer_(nullptr), channelManager_(channelManager) {}
86 
87 template <ChannelType Type>
88 void AbstractHttpChannel<Type>::processTypes(const std::map<TransportType, ChannelDirection>& types, boost::unique_lock<boost::mutex>& lock)
89 {
90  AbstractServerInfoPtr server = currentServer_;
91 
92  const auto& bodyRaw = multiplexer_->compileRequest(types);
93  // Creating HTTP request using the given data
94  boost::shared_ptr<IHttpRequest> postRequest = createRequest(server, bodyRaw);
95 
96  KAA_MUTEX_UNLOCKING("channelGuard_");
97  lock.unlock();
98  KAA_MUTEX_UNLOCKED("channelGuard_");
99  try {
100  // Sending http request
101  auto response = httpClient_.sendRequest(*postRequest);
102 
103  KAA_MUTEX_LOCKING("channelGuard_");
104  boost::unique_lock<boost::mutex> lockInternal(channelGuard_);
105  KAA_MUTEX_LOCKED("channelGuard_");
106 
107  // Retrieving the avro data from the HTTP response
108  const std::string& processedResponse = retrieveResponse(*response);
109  lastConnectionFailed_ = false;
110 
111  KAA_MUTEX_UNLOCKING("channelGuard_");
112  lockInternal.unlock();
113  KAA_MUTEX_UNLOCKED("channelGuard_");
114 
115  if (!processedResponse.empty()) {
116  demultiplexer_->processResponse(
117  std::vector<boost::uint8_t>(reinterpret_cast<const boost::uint8_t *>(processedResponse.data()),
118  reinterpret_cast<const boost::uint8_t *>(processedResponse.data() + processedResponse.size())));
119  }
120  } catch (std::exception& e) {
121  KAA_LOG_ERROR(boost::format("Connection failed, server %1%:%2%: %3%") % server->getHost() % server->getPort() % e.what());
122 
123  KAA_MUTEX_LOCKING("channelGuard_");
124  boost::unique_lock<boost::mutex> lockInternal(channelGuard_);
125  KAA_MUTEX_LOCKED("channelGuard_");
126 
127  lastConnectionFailed_ = true;
128 
129  KAA_MUTEX_UNLOCKING("channelGuard_");
130  lockInternal.unlock();
131  KAA_MUTEX_UNLOCKED("channelGuard_");
132 
133  channelManager_->onServerFailed(server);
134  }
135 }
136 
137 template <ChannelType Type>
139 {
140  const auto& supportedTypes = getSupportedTransportTypes();
141  auto it = supportedTypes.find(type);
142  if (it != supportedTypes.end() && (it->second == ChannelDirection::UP || it->second == ChannelDirection::BIDIRECTIONAL)) {
143  KAA_MUTEX_LOCKING("channelGuard_");
144  boost::unique_lock<boost::mutex> lock(channelGuard_);
145  KAA_MUTEX_LOCKED("channelGuard_");
146  if (currentServer_) {
147  processTypes(std::map<TransportType, ChannelDirection>({ { type, it->second } }), lock);
148  } else {
149  lastConnectionFailed_ = true;
150  KAA_LOG_WARN(boost::format("Can't sync channel %1%. Server is null") % getId());
151  }
152  } else {
153  KAA_LOG_ERROR(boost::format("Unsupported transport type for channel %1%") % getId());
154  }
155 }
156 
157 template <ChannelType Type>
159 {
160  KAA_MUTEX_LOCKING("channelGuard_");
161  boost::unique_lock<boost::mutex> lock(channelGuard_);
162  KAA_MUTEX_LOCKED("channelGuard_");
163  if (currentServer_) {
164  processTypes(getSupportedTransportTypes(), lock);
165  } else {
166  lastConnectionFailed_ = true;
167  KAA_LOG_WARN(boost::format("Can't sync channel %1%. Server is null") % getId());
168  }
169 }
170 
171 template <ChannelType Type>
173 {
174  KAA_LOG_DEBUG(boost::format("Sync ack operation is not supported by channel %1%.") % getId());
175 }
176 
177 template <ChannelType Type>
179 {
180  KAA_MUTEX_LOCKING("channelGuard_");
181  boost::unique_lock<boost::mutex> lock(channelGuard_);
182  KAA_MUTEX_LOCKED("channelGuard_");
183  multiplexer_ = multiplexer;
184 }
185 
186 template <ChannelType Type>
188 {
189  KAA_MUTEX_LOCKING("channelGuard_");
190  boost::unique_lock<boost::mutex> lock(channelGuard_);
191  KAA_MUTEX_LOCKED("channelGuard_");
192  demultiplexer_ = demultiplexer;
193 }
194 
195 template <ChannelType Type>
197 {
198  if (server->getChannelType() == getChannelType()) {
199  KAA_MUTEX_LOCKING("channelGuard_");
200  boost::unique_lock<boost::mutex> lock(channelGuard_);
201  KAA_MUTEX_LOCKED("channelGuard_");
202  currentServer_ = boost::dynamic_pointer_cast<AbstractServerInfo<Type>, IServerInfo>(server);
203  boost::shared_ptr<IEncoderDecoder> encDec(new RsaEncoderDecoder(clientKeys_.first, clientKeys_.second, currentServer_->getPublicKey()));
204  httpDataProcessor_.setEncoderDecoder(encDec);
205  if (lastConnectionFailed_) {
206  lastConnectionFailed_ = false;
207  processTypes(getSupportedTransportTypes(), lock);
208  }
209  } else {
210  KAA_LOG_ERROR(boost::format("Invalid server info for channel %1%") % getId());
211  }
212 }
213 
214 }
215 
216 #endif /* ABSTRACTHTTPCHANNEL_HPP_ */
virtual void setDemultiplexer(IKaaDataDemultiplexer *demultiplexer)
std::pair< Botan::MemoryVector< boost::uint8_t >, std::string > KeyPair
Definition: KeyUtils.hpp:28
#define KAA_LOG_DEBUG(message)
Definition: Log.hpp:43
#define KAA_MUTEX_UNLOCKING(mutex_name)
Definition: Log.hpp:81
#define KAA_MUTEX_LOCKED(mutex_name)
Definition: Log.hpp:80
#define KAA_LOG_ERROR(message)
Definition: Log.hpp:47
virtual void setServer(IServerInfoPtr server)
#define KAA_LOG_WARN(message)
Definition: Log.hpp:46
virtual void syncAck(TransportType type)
boost::shared_ptr< IServerInfo > IServerInfoPtr
Definition: IServerInfo.hpp:55
virtual void setConnectivityChecker(ConnectivityCheckerPtr checker)
virtual void sync(TransportType type)
#define KAA_MUTEX_UNLOCKED(mutex_name)
Definition: Log.hpp:82
virtual void processTypes(const std::map< TransportType, ChannelDirection > &types, boost::unique_lock< boost::mutex > &lock)
virtual ChannelType getChannelType() const
AbstractHttpChannel(IKaaChannelManager *channelManager, const KeyPair &clientKeys)
HttpDataProcessor * getHttpDataProcessor()
boost::shared_ptr< AbstractServerInfo< Type > > AbstractServerInfoPtr
boost::shared_ptr< IConnectivityChecker > ConnectivityCheckerPtr
virtual void setMultiplexer(IKaaDataMultiplexer *multiplexer)
#define KAA_MUTEX_LOCKING(mutex_name)
Definition: Log.hpp:79