My Avatar

JacketWoo

King Birds Fly Fast and Early!

基于HTTP2提升apns推送性能实践

2018年05月09日 星期二, 发表于 北京

如果您对本文有任何的建议或者疑问, 可以给我发邮件(seuxfw1990@gmail.com)或者在 这里给我提 Issues, 谢谢! :)

1. 背景

​ 与android设备相比,苹果设备上对应用的活动限制得更加严格,要求几乎全部的消息都要通过苹果的官方推送平台apns来推送。但是apns的服务器在美国,国内访问apns服务器的延迟很高,据不严格的统计,在北京的机房内,串行访问apns服务的qps差不多为3~5左右。为此,当要获得较高的推送性能的话,常见的方法有两种:

但是上面两个方法,都绕不开成本的问题。

​ apns在2015年之后,推送消息的接口换成了使用http2的方式(具体参考官方文档)。http1.x要求后一个请求必须在上一个应答返回之后才能发起,与此不同,http2允许多个逻辑通道复用一个物理连接,可以充分利用物理连接的传输能力(具体参考http2的介绍)。

​ 基于此,我们项目的优化思路是:在访问apns的机器上采用多线程,每个线程以短连接的方式使用一个物理连接(实验后,证明以长连接的方式使用多个物理连接效果不好),在每个连接上使用http2的多路复用特性。下面介绍demo实现和性能优化结果。

2. 实现

​ 我们项目是采用C++语言实现的,服务器操作系统是CentOS 7,基于的主要第三方库是libcurl。

2.1 编译libcurl库

a. 安装必须的依赖库,如ssl,nghttp2,z等,直接yum安装就行

a. 下载libcurl的源代码

c. libcurl中的某部分代码

​ 我们选择curl-7_56_0这个tag,当然也可以选择其他的tag或者分支,只要支持HTTP2就行

1
> git checkout curl-7_56_0 //我们选择curl-7_56_0这个tag

b. 编译curl库

1
2
> ./configure --with-nghttp2=/usr/local --with-ssl -prefix=~/install/curl
> make && make install

d. 编写demo

​ libcurl有两种使用方式,分别为easy interface和multi interface(具体参见curl官网关于multi interface的介绍),我们这里使用multi interface的方式,因为它允许使用multiplexed功能,并提供了事件驱动的功能(epoll),不需要我们自己实现了(当然,你也可以基于epoll自己实现更加灵活的io多路复用),multi_demo文件为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <sys/epoll.h>
#include <unistd.h>

#include <string>
#include <iostream>
#include <vector>
#include <utility>
#include <atomic>

#include "curl/curl.h"

void LogErrMsg(CURL* curl, CURLcode code, long http_code) {
  char* url = NULL;
  curl_easy_getinfo(curl, CURLINFO_EFFECTIVE_URL, &url); 
  fprintf(stderr, "curl failed, time: %llu, url: %s, curl code: %d, http code: %u\n", time(NULL), url, code, http_code);
}

void CheckMultiInfo(CURLM* mc) {
  CURLMsg* m = NULL;
  CURL* c = NULL;
  int32_t n_m;
  while (m = curl_multi_info_read(mc, &n_m)) {
    static std::atomic<int32_t> i(0);
    fprintf(stderr, "time: %llu, exit num: %u, left: %u\n", time(NULL), ++i, n_m);

    // currently, only CURLMSG_DONE is surpported
    if (m->msg == CURLMSG_DONE) {
      c = m->easy_handle;
      long http_code;
      if (m->data.result != CURLE_OK
          || curl_easy_getinfo(c, CURLINFO_RESPONSE_CODE, &http_code) != CURLE_OK
          || http_code != 200) {
        LogErrMsg(c, m->data.result, http_code);
      }
      curl_multi_remove_handle(mc, c);
      curl_easy_cleanup(c);
    }
  }
}


int32_t SocketCB(CURL* c,
                 curl_socket_t fd,
                 int32_t act,
                 void* up,
                 void* sp) {
  if (!up) {
    return -1;
  }
  int32_t ep_fd = *reinterpret_cast<int32_t*>(up);
  struct epoll_event et;
  et.events = 0; 
  et.data.fd = fd;
  if (act == CURL_POLL_REMOVE) {
    epoll_ctl(ep_fd, EPOLL_CTL_DEL, fd, NULL);
    return 0;
  }
  if (act & CURL_POLL_IN) {
    et.events |= EPOLLIN;
  }
  if (act & CURL_POLL_OUT) {
    et.events |= EPOLLOUT;
  }
  if (epoll_ctl(ep_fd, EPOLL_CTL_ADD, fd, &et) == -1) {
    epoll_ctl(ep_fd, EPOLL_CTL_MOD, fd, &et);
  }
  return 0;
}

int32_t TimerCB(CURLM* mc,
               long tm_ms,
               void* up) {
  if (!up) {
    return -1;
  }
  if (!tm_ms) {
    int32_t lf_hd = 0;
    curl_multi_socket_action(mc, CURL_SOCKET_TIMEOUT, 0, &lf_hd);
    CheckMultiInfo(mc);
  }
  
  //fprintf(stderr, "TimerCB, tm_ms: %u\n", tm_ms);
  if (tm_ms >= 1000
      || !tm_ms) {
    tm_ms = 200;
  }
  *reinterpret_cast<uint32_t*>(up) = tm_ms;
  return 0;
}

class ApnsRequest {
#define MAX_EVENT_ONCE 1024
public:
  ApnsRequest(const std::string& cert_file,
              const std::string& cert_pwd) :
      mc_(NULL),
      cert_file_(cert_file),
      cert_pwd_(cert_pwd),
      ep_fd_(0),
      tm_ms_(1000) {
  }
  virtual ~ApnsRequest() {}
  int32_t Init();
  int32_t Deinit();
  int32_t MultiSend(const std::vector<std::pair<std::string, std::string> >& items);
    
private:
  CURLM* mc_;
  std::string cert_file_;  
  std::string cert_pwd_;
  int32_t ep_fd_;
  struct epoll_event ep_ets_[MAX_EVENT_ONCE]; 
  uint32_t tm_ms_;
};

int32_t ApnsRequest::Init() {
  if ((ep_fd_ = epoll_create(MAX_EVENT_ONCE)) == -1) {
    return -1;
  } 

  mc_ = curl_multi_init();
  if (!mc_) {
    return -1;
  }
  curl_multi_setopt(mc_, CURLMOPT_SOCKETFUNCTION, SocketCB);
  curl_multi_setopt(mc_, CURLMOPT_SOCKETDATA, &ep_fd_);
  curl_multi_setopt(mc_, CURLMOPT_TIMERFUNCTION, TimerCB);
  curl_multi_setopt(mc_, CURLMOPT_TIMERDATA, &tm_ms_);
  curl_multi_setopt(mc_, CURLMOPT_MAX_HOST_CONNECTIONS, 1); //这里是设置最大连接数,其实这里设置了也没有用,因为对于multiplexed功能只会只用一个物理连接,这里设置为1,值是为了说明只使用了一个物理连接
  curl_multi_setopt(mc_, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX);
  //curl_multi_setopt(mc_, CURLMOPT_MAX_PIPELINE_LENGTH, 1000); //使用了http2的multiplexed功能后,libcurl会将max_pipe_len设为0,即不做限制,所以这里设置与不设置都没有意义
  return 0;
}

int32_t ApnsRequest::Deinit() {
  curl_multi_cleanup(mc_);
  if (!ep_fd_) {
    close(ep_fd_);
  }
}

int32_t ApnsRequest::MultiSend(const std::vector<std::pair<std::string, std::string> >& items) {
  CURL* c = NULL;
  for (const std::pair<std::string, std::string>& item : items) {
    if (!(c = curl_easy_init())) {
      break;
    }
    curl_easy_setopt(c, CURLOPT_URL, item.first.data());
    curl_easy_setopt(c, CURLOPT_POST, 1L);
    curl_easy_setopt(c, CURLOPT_POSTFIELDS, item.second.data());
    curl_easy_setopt(c, CURLOPT_POSTFIELDSIZE, item.second.size());
    curl_easy_setopt(c, CURLOPT_SSLCERT, cert_file_.data());
    curl_easy_setopt(c, CURLOPT_SSLCERTPASSWD, cert_pwd_.data());
    //curl_easy_setopt(c, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2_PRIOR_KNOWLEDGE);
    curl_easy_setopt(c, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2);

    curl_easy_setopt(c, CURLOPT_PIPEWAIT, 1L);

    curl_slist* headers = NULL;
    headers = curl_slist_append(headers, "apns-topic: com.webteam.imsdk");
    curl_easy_setopt(c, CURLOPT_HTTPHEADER, headers);
    
    curl_multi_add_handle(mc_, c);
  }
  if (!c) {
    return -1;
  }
  int32_t left_hd, num;
  do {
    num = epoll_wait(ep_fd_, reinterpret_cast<epoll_event*>(ep_ets_), MAX_EVENT_ONCE, tm_ms_);
    //fprintf(stderr, "tm_ms: %u, num: %u, left_hd: %u\n", tm_ms_, num, left_hd);
    if (num == -1 && errno == EINTR) {
      num = 0;
    }
    if (!num) {
      curl_multi_socket_action(mc_, CURL_SOCKET_TIMEOUT, 0, &left_hd);
      CheckMultiInfo(mc_); 
      continue;
    }
    for (int32_t idx = 0; idx != num; ++idx) {
      curl_multi_socket_action(mc_, ep_ets_[idx].data.fd, 0, &left_hd);
    }
    CheckMultiInfo(mc_); 
  } while (left_hd);
  return 0;
}

int32_t main(int32_t argc, char* argv[]) {
  std::string token = "348f762dffc0cd73b91958aea7c2d096dbba6560eb0bebc9a5254d75823e8b7b";
  std::string cert_file = "~/for_sample/cert/apns-pro.pem";
  std::string host = "api.push.apple.com";


  std::string path = "/3/device/" + token;
  std::string url = "https://" + host + path;
  std::string msg = "{\"aps\":{\"alert\":\"hi ninjacn\",\"badge\":42}}";
  

  std::vector<std::pair<std::string, std::string> > send_items0, send_items; 
  for (int32_t idx = 0; idx != 150; ++idx) {
    send_items0.push_back(std::pair<std::string, std::string>{url, msg});
  }
  for (int32_t i=0; i != 10000; ++i) { //在循环内使用短连接而不使用长连接,是因为试验后表明,短连接的速度更快;具体原因可能与libcurl内关于http2的调度,以及apns的服务器内实现有关系,猜测是apns服务器在某个连接上限流的可能性更大些。
    ApnsRequest sender(cert_file, "abc123");
    sender.Init();
    sender.MultiSend(send_items0);
    sender.Deinit();
  }
  return 0;
}

e. 编译demo

1
> g++ -g -O0 --std=c++11 -o multi_demo multi_demo.cc -I~/install/curl/include/ ~/install/curl/lib/libcurl.a -lcrypto -lz -lssl -lnghttp2

3. 结果

上面demo使用的技术点为:1)http2的multiplexed特性;2)短连接

下面给出通常的使用easy interface的情况,也是单线程,easy_demo文件为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>

#include <string>
#include <iostream>

#include "curl/curl.h"

#define NUM 100

static
void dump(const char *text, int num, unsigned char *ptr, size_t size,
          char nohex)
{
  size_t i;
  size_t c;
  unsigned int width = 0x10;
 
  if(nohex)
    /* without the hex output, we can fit more on screen */ 
    width = 0x40;
 
  fprintf(stderr, "%d %s, %ld bytes (0x%lx)\n",
          num, text, (long)size, (long)size);
 
  for(i = 0; i<size; i += width) {
 
    fprintf(stderr, "%4.4lx: ", (long)i);
 
    if(!nohex) {
      /* hex not disabled, show it */ 
      for(c = 0; c < width; c++)
        if(i + c < size)
          fprintf(stderr, "%02x ", ptr[i + c]);
        else
          fputs("   ", stderr);
    }
 
    for(c = 0; (c < width) && (i + c < size); c++) {
      /* check for 0D0A; if found, skip past and start a new line of output */ 
      if(nohex && (i + c + 1 < size) && ptr[i + c] == 0x0D &&
         ptr[i + c + 1] == 0x0A) {
        i += (c + 2 - width);
        break;
      }
      fprintf(stderr, "%c",
              (ptr[i + c] >= 0x20) && (ptr[i + c]<0x80)?ptr[i + c]:'.');
      /* check again for 0D0A, to avoid an extra \n if it's at width */ 
      if(nohex && (i + c + 2 < size) && ptr[i + c + 1] == 0x0D &&
         ptr[i + c + 2] == 0x0A) {
        i += (c + 3 - width);
        break;
      }
    }
    fputc('\n', stderr); /* newline */ 
  }
}
 
static
int my_trace(CURL *handle, curl_infotype type,
             char *data, size_t size,
             void *userp)
{
  char timebuf[20];
  const char *text;
//  int num = hnd2num(handle);
  int num = 0;
  static time_t epoch_offset;
  static int    known_offset;
  struct timeval tv;
  time_t secs;
  struct tm *now;

  (void)handle; /* prevent compiler warning */
  (void)userp;

  gettimeofday(&tv, NULL);
  if(!known_offset) {
    epoch_offset = time(NULL) - tv.tv_sec;
    known_offset = 1;
  }
  secs = epoch_offset + tv.tv_sec;
  now = localtime(&secs);  /* not thread safe but we don't care */
  snprintf(timebuf, sizeof(timebuf), "%02d:%02d:%02d.%06ld",
           now->tm_hour, now->tm_min, now->tm_sec, (long)tv.tv_usec);

  switch(type) {
  case CURLINFO_TEXT:
    fprintf(stderr, "%s [%d] Info: %s", timebuf, num, data);
    /* FALLTHROUGH */
  default: /* in case a new one is introduced to shock us */
    return 0;

  case CURLINFO_HEADER_OUT:
    text = "=> Send header";
    break;
  case CURLINFO_DATA_OUT:
    text = "=> Send data";
    break;
  case CURLINFO_SSL_DATA_OUT:
    text = "=> Send SSL data";
    break;
  case CURLINFO_HEADER_IN:
    text = "<= Recv header";
    break;
  case CURLINFO_DATA_IN:
    text = "<= Recv data";
    break;
  case CURLINFO_SSL_DATA_IN:
    text = "<= Recv SSL data";
    break;
  }

  dump(text, num, (unsigned char *)data, size, 1);
  return 0;
}

int main(int argc, char* argv[]) {
  std::string token = "348f762dffc0cd73b91958aea7c2d096dbba6560eb0bebc9a5254d75823e8b7b";
  std::string cert_file = "~/for_sample/cert/apns-pro.pem";
  std::string host = "api.push.apple.com";
  //std::string token = "48dfd4ac621749f79722cf119d2bb20674b2f3e489f0fe8f7b75a6e9fb31890f";
  //std::string cert_file = "~/for_sample/cert/apns-pro.pem";
  //std::string host = "api.development.push.apple.com";

  std::string path = "/3/device/" + token;
  std::string url = "https://" + host + path;
  std::string msg = "{\"aps\":{\"alert\":\"hi ninjacn\",\"badge\":42}}";
 
  CURL* curl = curl_easy_init(); 
  if (!curl) {
    fprintf(stderr, "init curl failed\n", strerror(errno));
    return -1;
  }
  
  std::cout << url << std::endl;
  curl_easy_setopt(curl, CURLOPT_URL, url.data());
  curl_easy_setopt(curl, CURLOPT_POST, 1L);
  curl_easy_setopt(curl, CURLOPT_POSTFIELDS, msg.data());
  curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msg.size());
  curl_easy_setopt(curl, CURLOPT_SSLCERT, cert_file.data());
  curl_easy_setopt(curl, CURLOPT_SSLCERTPASSWD, "abc123");

  curl_slist* headers = NULL;
  headers = curl_slist_append(headers, "apns-topic: com.webteam.imsdk");
  curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);

  //CURLcode ret = curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2_0);
  CURLcode ret = curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2);
  fprintf(stderr, "version res: %u\n", ret);
  //curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
  //curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, my_trace);
 
  time_t st = time(NULL);
  for (int32_t i = 0; i != NUM; ++i) {
    if (curl_easy_perform(curl) != CURLE_OK) {
      fprintf(stderr, "curl easy perform error: %s\n", strerror(errno));
      return -1;
    }
    std::cout << i << std::endl;
  }
  std::cout << "qps: " << NUM*1.0 / (time(NULL)-st) << std::endl;
  curl_easy_cleanup(curl);
  return 0; 
}

也是按照相同的方式编译

1
g++ -g -O0 --std=c++11 -o easy_demo easy_demo.cc -I~/install/curl/include/  ~/install/curl/lib/libcurl.a -lcrypto -lz -lssl -lnghttp2

运行后:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
第一次:
> ./easy_demo
...
qps: 5.26

第二次:
> ./easy_demo
...
qps: 3.33

第三次:
> ./easy_demo
...
qps: 4.55

现在运行multi_demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
> multi_demo 2>log

第一次:
> t1=`head -n 1 log | awk -F'[: ,]' '{print $3;}'` && line=`tail -n 1 log` && t2=`echo $line | awk -F'[: ,]' '{print $3;}'` && num=`echo $line | awk -F'[: ,]' '{print $8;}'` && echo $t1, $t2, $num && ((qps=num/(t2-t1))) && echo "qps: "$qps
1525789052, 1525789081, 3000
qps: 103

第二次:
> t1=`head -n 1 log | awk -F'[: ,]' '{print $3;}'` && line=`tail -n 1 log` && t2=`echo $line | awk -F'[: ,]' '{print $3;}'` && num=`echo $line | awk -F'[: ,]' '{print $8;}'` && echo $t1, $t2, $num && ((qps=num/(t2-t1))) && echo "qps: "$qps
1525789110, 1525789116, 750
qps: 125

第三次:
> t1=`head -n 1 log | awk -F'[: ,]' '{print $3;}'` && line=`tail -n 1 log` && t2=`echo $line | awk -F'[: ,]' '{print $3;}'` && num=`echo $line | awk -F'[: ,]' '{print $8;}'` && echo $t1, $t2, $num && ((qps=num/(t2-t1))) && echo "qps: "$qps
1525789110, 1525789140, 3150
qps: 105

从对结果中可以看到在,qps性能有20~40的提升

结果说明:

a. 使用的都是单线程

b. 都只使用了一个物理连接

c. 在同一台机器上进行的测试

4. 结论

a. 利用libcurl的multi-interface接口,b. 使用apns的http2接口;c. 使用短连接,可以提升从中国大陆向apns推送消息的性能,提升服务大约可以为20~40倍。

注意:

在multi_demo.cc中

1
2
3
 for (int32_t idx = 0; idx != 150; ++idx) {
    send_items0.push_back(std::pair<std::string, std::string>{url, msg});
  }

代码里150这个值,是我根据环境调试出来,超过150可能会卡顿,小于150,又不能发挥出http2的多路复用的优势。虽然在不同的机房、网络环境下测试,这个值都可以适用,但是我觉得这个值应该与具体的网络等环境有关,在项目中应该要动态调整。