client-cpp  0.6.1
AbstractHttpChannel.hpp
Go to the documentation of this file.
1 /*
2  * Copyright 2014 CyberVision, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 
18 
19 #ifndef ABSTRACTHTTPCHANNEL_HPP_
20 #define ABSTRACTHTTPCHANNEL_HPP_
21 
22 #include "kaa/KaaDefaults.hpp"
23 
24 #if defined(KAA_DEFAULT_BOOTSTRAP_HTTP_CHANNEL) || defined (KAA_DEFAULT_OPERATION_HTTP_CHANNEL)
25 
26 
29 #include "kaa/http/HttpClient.hpp"
30 
32 #include "kaa/logging/Log.hpp"
41 #include "kaa/KaaThread.hpp"
42 
43 #include <cstdint>
44 
45 namespace kaa {
46 
47 template <ChannelType Type>
48 class AbstractHttpChannel : public ImpermanentDataChannel {
49 public:
50  AbstractHttpChannel(IKaaChannelManager *channelManager, const KeyPair& clientKeys);
51  virtual ~AbstractHttpChannel() { }
52 
53  virtual void sync(TransportType type);
54  virtual void syncAll();
55  virtual void syncAck(TransportType type);
56  virtual void setMultiplexer(IKaaDataMultiplexer *multiplexer);
57  virtual void setDemultiplexer(IKaaDataDemultiplexer *demultiplexer);
58  virtual void setServer(IServerInfoPtr server);
59 
60  virtual ChannelType getChannelType() const { return Type; }
61 
62  virtual void setConnectivityChecker(ConnectivityCheckerPtr checker) {}
63 
64 protected:
65  typedef std::shared_ptr<AbstractServerInfo<Type> > AbstractServerInfoPtr;
66 
67  HttpDataProcessor* getHttpDataProcessor() { return &httpDataProcessor_; }
68  virtual void processTypes(const std::map<TransportType, ChannelDirection>& types, KAA_MUTEX_UNIQUE& lock);
69 
70 private:
71  virtual std::shared_ptr<IHttpRequest> createRequest(AbstractServerInfoPtr server, const std::vector<std::uint8_t>& body) = 0;
72  virtual std::string retrieveResponse(const IHttpResponse& response) = 0;
73 
74 private:
75  KeyPair clientKeys_;
76 
77  bool lastConnectionFailed_;
78 
79  IKaaDataMultiplexer *multiplexer_;
80  IKaaDataDemultiplexer *demultiplexer_;
81  IKaaChannelManager *channelManager_;
82  AbstractServerInfoPtr currentServer_;
83  HttpDataProcessor httpDataProcessor_;
84  HttpClient httpClient_;
85  KAA_MUTEX_DECLARE(channelGuard_);
86 };
87 
88 template <ChannelType Type>
89 AbstractHttpChannel<Type>::AbstractHttpChannel(IKaaChannelManager *channelManager, const KeyPair& clientKeys)
90  : clientKeys_(clientKeys), lastConnectionFailed_(false)
91  , multiplexer_(nullptr), demultiplexer_(nullptr), channelManager_(channelManager) {}
92 
93 template <ChannelType Type>
94 void AbstractHttpChannel<Type>::processTypes(const std::map<TransportType, ChannelDirection>& types, KAA_MUTEX_UNIQUE& lock)
95 {
96  AbstractServerInfoPtr server = currentServer_;
97 
98  const auto& bodyRaw = multiplexer_->compileRequest(types);
99  // Creating HTTP request using the given data
100  std::shared_ptr<IHttpRequest> postRequest = createRequest(server, bodyRaw);
101 
102  KAA_MUTEX_UNLOCKING("channelGuard_");
103  KAA_UNLOCK(lock);
104  KAA_MUTEX_UNLOCKED("channelGuard_");
105  try {
106  // Sending http request
107  auto response = httpClient_.sendRequest(*postRequest);
108 
109  KAA_MUTEX_LOCKING("channelGuard_");
110  KAA_MUTEX_UNIQUE_DECLARE(lockInternal, channelGuard_);
111  KAA_MUTEX_LOCKED("channelGuard_");
112 
113  // Retrieving the avro data from the HTTP response
114  const std::string& processedResponse = retrieveResponse(*response);
115  lastConnectionFailed_ = false;
116 
117  KAA_MUTEX_UNLOCKING("channelGuard_");
118  KAA_UNLOCK(lockInternal);
119  KAA_MUTEX_UNLOCKED("channelGuard_");
120 
121  if (!processedResponse.empty()) {
122  demultiplexer_->processResponse(
123  std::vector<std::uint8_t>(reinterpret_cast<const std::uint8_t *>(processedResponse.data()),
124  reinterpret_cast<const std::uint8_t *>(processedResponse.data() + processedResponse.size())));
125  }
126  } catch (std::exception& e) {
127  KAA_LOG_ERROR(boost::format("Connection failed, server %1%:%2%: %3%") % server->getHost() % server->getPort() % e.what());
128 
129  KAA_MUTEX_LOCKING("channelGuard_");
130  KAA_MUTEX_UNIQUE_DECLARE(lockInternal, channelGuard_);
131  KAA_MUTEX_LOCKED("channelGuard_");
132 
133  lastConnectionFailed_ = true;
134 
135  KAA_MUTEX_UNLOCKING("channelGuard_");
136  KAA_UNLOCK(lockInternal);
137  KAA_MUTEX_UNLOCKED("channelGuard_");
138 
139  channelManager_->onServerFailed(server);
140  }
141 }
142 
143 template <ChannelType Type>
144 void AbstractHttpChannel<Type>::sync(TransportType type)
145 {
146  const auto& supportedTypes = getSupportedTransportTypes();
147  auto it = supportedTypes.find(type);
148  if (it != supportedTypes.end() && (it->second == ChannelDirection::UP || it->second == ChannelDirection::BIDIRECTIONAL)) {
149  KAA_MUTEX_LOCKING("channelGuard_");
150  KAA_MUTEX_UNIQUE_DECLARE(lock, channelGuard_);
151  KAA_MUTEX_LOCKED("channelGuard_");
152  if (currentServer_) {
153  processTypes(std::map<TransportType, ChannelDirection>({ { type, it->second } }), lock);
154  } else {
155  lastConnectionFailed_ = true;
156  KAA_LOG_WARN(boost::format("Can't sync channel %1%. Server is null") % getId());
157  }
158  } else {
159  KAA_LOG_ERROR(boost::format("Unsupported transport type for channel %1%") % getId());
160  }
161 }
162 
163 template <ChannelType Type>
164 void AbstractHttpChannel<Type>::syncAll()
165 {
166  KAA_MUTEX_LOCKING("channelGuard_");
167  KAA_MUTEX_UNIQUE_DECLARE(lock, channelGuard_);
168  KAA_MUTEX_LOCKED("channelGuard_");
169  if (currentServer_) {
170  processTypes(getSupportedTransportTypes(), lock);
171  } else {
172  lastConnectionFailed_ = true;
173  KAA_LOG_WARN(boost::format("Can't sync channel %1%. Server is null") % getId());
174  }
175 }
176 
177 template <ChannelType Type>
178 void AbstractHttpChannel<Type>::syncAck(TransportType type)
179 {
180  KAA_LOG_DEBUG(boost::format("Sync ack operation is not supported by channel %1%.") % getId());
181 }
182 
183 template <ChannelType Type>
184 void AbstractHttpChannel<Type>::setMultiplexer(IKaaDataMultiplexer *multiplexer)
185 {
186  KAA_MUTEX_LOCKING("channelGuard_");
187  KAA_MUTEX_UNIQUE_DECLARE(lock, channelGuard_);
188  KAA_MUTEX_LOCKED("channelGuard_");
189  multiplexer_ = multiplexer;
190 }
191 
192 template <ChannelType Type>
193 void AbstractHttpChannel<Type>::setDemultiplexer(IKaaDataDemultiplexer *demultiplexer)
194 {
195  KAA_MUTEX_LOCKING("channelGuard_");
196  KAA_MUTEX_UNIQUE_DECLARE(lock, channelGuard_);
197  KAA_MUTEX_LOCKED("channelGuard_");
198  demultiplexer_ = demultiplexer;
199 }
200 
201 template <ChannelType Type>
202 void AbstractHttpChannel<Type>::setServer(IServerInfoPtr server)
203 {
204  if (server->getChannelType() == getChannelType()) {
205  KAA_MUTEX_LOCKING("channelGuard_");
206  KAA_MUTEX_UNIQUE_DECLARE(lock, channelGuard_);
207  KAA_MUTEX_LOCKED("channelGuard_");
208  currentServer_ = std::dynamic_pointer_cast<AbstractServerInfo<Type>, IServerInfo>(server);
209  std::shared_ptr<IEncoderDecoder> encDec(new RsaEncoderDecoder(clientKeys_.first, clientKeys_.second, currentServer_->getPublicKey()));
210  httpDataProcessor_.setEncoderDecoder(encDec);
211  if (lastConnectionFailed_) {
212  lastConnectionFailed_ = false;
213  processTypes(getSupportedTransportTypes(), lock);
214  }
215  } else {
216  KAA_LOG_ERROR(boost::format("Invalid server info for channel %1%") % getId());
217  }
218 }
219 
220 }
221 
222 #endif
223 
224 #endif /* ABSTRACTHTTPCHANNEL_HPP_ */
225 
#define KAA_UNLOCK(mtx)
Definition: KaaThread.hpp:62
std::pair< Botan::MemoryVector< std::uint8_t >, std::string > KeyPair
Definition: KeyUtils.hpp:29
std::shared_ptr< IConnectivityChecker > ConnectivityCheckerPtr
#define KAA_LOG_DEBUG(message)
Definition: Log.hpp:59
#define KAA_MUTEX_UNIQUE
Definition: KaaThread.hpp:58
#define KAA_MUTEX_UNLOCKING(mutex_name)
Definition: Log.hpp:115
#define KAA_MUTEX_LOCKED(mutex_name)
Definition: Log.hpp:114
#define KAA_LOG_ERROR(message)
Definition: Log.hpp:74
std::shared_ptr< IServerInfo > IServerInfoPtr
Definition: IServerInfo.hpp:53
#define KAA_LOG_WARN(message)
Definition: Log.hpp:69
#define KAA_MUTEX_DECLARE(name)
Definition: KaaThread.hpp:71
#define KAA_MUTEX_UNIQUE_DECLARE(name, mtx)
Definition: KaaThread.hpp:75
#define KAA_MUTEX_UNLOCKED(mutex_name)
Definition: Log.hpp:116
#define KAA_MUTEX_LOCKING(mutex_name)
Definition: Log.hpp:113