Gelyktydige Versoek Verwerking vir HTTP Bediener in C: POSIX Liggewigproses Verbeterings
In hierdie artikel kyk ons na optimering van die C HTTP bediener deur gebruik te maak van POSIX liggewigprosesse om HTTP versoeke gelyktydig te hanteer. Hier implementeer ons ‘n liggewigproses poel wat dan in die hooflus wat versoeke hanteer gebruik word. Die vorige weergawe van die bediener het versoeke sekwensieel een na die ander hanteer. Die oorspronklike artikel is beskikbaar hier.
Vir die poel implementasie begin ons deur ‘n koppelvlak te skep by ./src/thpool.h
#ifndef THPOOL_H
#define THPOOL_H
typedef struct ThreadPool ThreadPool;
ThreadPool* thpool_init(int num_threads);
int thpool_add_work(ThreadPool *pool, void (*func)(void *), void *arg);
int thpool_get_num_threads(ThreadPool *pool);
int thpool_get_queue_count(ThreadPool *pool);
void thpool_destroy(ThreadPool *pool);
#endifDie ThreadPool struktuur word later in die implementasie volledig gedefinieer. Die thpool_init funksie initialiseer ‘n poel met ‘n spesifieke grootte, die thpool_add_work funksie voeg ‘n taak by die poel by, thpool_get_num_threads kry die aantal liggewig prosesse, thpool_get_queue_count kry die hoeveelheid take wat tans besig is, en thpool_destroy sluit die poel af.
Volgende kan die poel implementeer word in ./src/thpool.c soos volg:
#include "thpool.h"
#include <pthread.h>
#include <stdlib.h>
typedef struct Task {
void (*func)(void *);
void *arg;
} Task;
struct ThreadPool {
pthread_t *threads;
Task *queue;
int queue_size;
int head, tail, count;
int num_threads;
int shutdown;
pthread_mutex_t lock;
pthread_cond_t notify;
};
static void* thread_do(void *arg) {
ThreadPool *pool = (ThreadPool *)arg;
while (1) {
pthread_mutex_lock(&pool->lock);
while (pool->count == 0 && !pool->shutdown) {
pthread_cond_wait(&pool->notify, &pool->lock);
}
if (pool->shutdown) {
pthread_mutex_unlock(&pool->lock);
break;
}
Task task = pool->queue[pool->head];
pool->head = (pool->head + 1) % pool->queue_size;
pool->count--;
pthread_mutex_unlock(&pool->lock);
(*(task.func))(task.arg);
}
return NULL;
}
ThreadPool* thpool_init(int num_threads) {
ThreadPool *pool = malloc(sizeof(ThreadPool));
pool->num_threads = num_threads;
pool->queue_size = 1024;
pool->queue = malloc(sizeof(Task) * pool->queue_size);
pool->threads = malloc(sizeof(pthread_t) * num_threads);
pool->head = pool->tail = pool->count = pool->shutdown = 0;
pthread_mutex_init(&pool->lock, NULL);
pthread_cond_init(&pool->notify, NULL);
for (int i = 0; i < num_threads; i++) {
pthread_create(&pool->threads[i], NULL, thread_do, pool);
}
return pool;
}
int thpool_add_work(ThreadPool *pool, void (*func)(void *), void *arg) {
pthread_mutex_lock(&pool->lock);
if (pool->count == pool->queue_size) {
pthread_mutex_unlock(&pool->lock);
return -1; // Queue vol
}
pool->queue[pool->tail].func = func;
pool->queue[pool->tail].arg = arg;
pool->tail = (pool->tail + 1) % pool->queue_size;
pool->count++;
pthread_cond_signal(&pool->notify);
pthread_mutex_unlock(&pool->lock);
return 0;
}
int thpool_get_num_threads(ThreadPool *pool) {
if (!pool) return 0;
return pool->num_threads;
}
int thpool_get_queue_count(ThreadPool *pool) {
if (!pool) return 0;
int count;
pthread_mutex_lock(&pool->lock);
count = pool->count;
pthread_mutex_unlock(&pool->lock);
return count;
}
void thpool_destroy(ThreadPool *pool) {
pthread_mutex_lock(&pool->lock);
pool->shutdown = 1;
pthread_cond_broadcast(&pool->notify);
pthread_mutex_unlock(&pool->lock);
for (int i = 0; i < pool->num_threads; i++) {
pthread_join(pool->threads[i], NULL);
}
free(pool->threads);
free(pool->queue);
free(pool);
}Hier volg ‘n beskrywing van die implentasie:
static void* thread_do(void *arg): Hierdie is die interne werker funksie wat elke liggewigproses in die poel uitvoer. Dit bevat ‘n oneindige lus wat wag vir take. Ons gebruik ‘nmutexlockenconditionveranderlike om veilig te wag totdat daar werk in die tou is of tot wanneer die stelsel afskakel. Sodra ‘n taak beskikbaar is, haal dit die funksiewyser en argument uit die tou en voer dit uit.ThreadPool* thpool_init(int num_threads): Hierdie funksie skep en konfigureer die poel. Dit ken geheue toe vir dieThreadPoolstruktuur, die tou take, en die ID van die liggewig prosesse. Dit inisialiseer die ko├Ărdinasie-meganismes. Dit begin die gespesifiseerde aantal prosesse (pthread_create) wat dadelik inthread_dofunksie begin wag vir werk.int thpool_add_work(ThreadPool *pool, void (*func)(void *), void *arg): Voeg ‘n nuwe taak by die poel se tou. Dit sluit die tou om data korrupsie te voorkom. As daar plek is, stoor dit die funksiewyser en sy data in die tou. Dit stuur ‘n sein (pthread_cond_signal) om een van die slapende liggewigprosesse wakker te maak sodat dit die werk kan begin doen.int thpool_get_num_threads(ThreadPool *pool): ‘n Eenvoudige helper funksie wat die totale aantal liggewigprosse terugstuur wat tydens inisialisering geskep is.int thpool_get_queue_count(ThreadPool *pool): Gee die huidige aantal take terug wat in die tou wag om verwerk te word. Dit gebruik ‘n slot (lock) om te verseker dat die telling akkuraat gelees word terwyl ander liggewigprosesse dalk take verwyder.void thpool_destroy(ThreadPool *pool): Maak die poel skoon en stop alle aktiwiteit. Dit stel dieshutdownvlag na1en stuurn sein uit om alle prosesse wakker te maak sodat hulle hul lusse kan verlaat. Dit wag vir elke proses om klaar te maak (pthread_join). Laastens stel dit alle geheue vry (free`) om geheue lekkasies te voorkom.
Volgende kan on ./src/worker.h defineer:
#ifndef WORKER_H
#define WORKER_H
#include "server.h"
typedef struct {
int client_fd;
Server *server;
} WorkerArgs;
void handle_client_task(void *arg);
#endifDie handle_client_task funksie is die funksie wat deur die poel uitgevoer word. Implementeer handle_client_task in ./src/worker.c soos volg:
#include "worker.h"
#include "util.h"
#include "router.h"
#include <unistd.h>
#include <string.h>
#include <sys/socket.h>
#include <stdlib.h>
#include <stdio.h>
void handle_client_task(void *arg) {
WorkerArgs *w_args = (WorkerArgs *)arg;
int client_fd = w_args->client_fd;
Server *server = w_args->server;
char buf[2048];
char res_buf[4096];
if (read_http_header(client_fd, buf, sizeof(buf)) > 0) {
HTTPRequest req;
if (parse_http_request(buf, &req, server) == 0) {
route_handler handler = route_get_handler(server, req.path);
if (handler) {
char *body = handler(client_fd, &req);
if (body) {
char mime[32] = "text/plain";
if (body[0] == '{' || body[0] == '[') strcpy(mime, "application/json");
else if (body[0] == '<') strcpy(mime, "text/html");
build_http_response(res_buf, sizeof(res_buf), 200, "OK",
"Content-Type", mime, "Connection", "close", NULL, body);
send(client_fd, res_buf, strlen(res_buf), 0);
}
} else {
const char *not_found = "{\"error\": \"Not Found\"}";
build_http_response(res_buf, sizeof(res_buf), 404, "Not Found",
"Content-Type", "application/json", NULL, not_found);
send(client_fd, res_buf, strlen(res_buf), 0);
}
}
}
close(client_fd);
free(w_args); // Vry die argumente wat in server.c geskep is
}Die implementasie hierbo is wat ons oorspronklik in ./src/server.c gehad het om HTTP versoeke te hanteer.
Die nuwe poel kan gekoppel word in die bediener se data struktuur in ./src/server.h soos volg:
...
#include "thpool.h" // nuut ingesluit
typedef struct Server {
int port;
int server_fd;
struct sockaddr_in address;
int backlog;
int is_running;
struct Route *routes;
int use_pool; // nuwe veld om liggewig proses aan of af te sit
ThreadPool *pool; // nuwe veld vir die liggewig proses-poel
} Server;
...Die ThreadPool *pool veld is bygevoeg om te verwys na die poel. Ons voeg ook ‘n veld use_pool in om te onderskei of ons soos van tevore of met ‘n aktiewe poel HTTP versoeke hanteer.
Nou kan ons die hanteering van versoeke in ./src/server.c verander om gebruik te maak van die poel:
...
#include "worker.h" // nuwe insluiting vir WorkerArgs
...
void server_start(Server *server) {
...
if (server->use_pool && server->pool) {
int pool_size = thpool_get_num_threads(server->pool);
printf("Server listening on port %d (Multi-threaded mode: %d threads)...\n", server->port, pool_size);
} else {
printf("Server listening on port %d (Single-threaded mode)...\n", server->port);
}
...
while (server->is_running) {
...
if (client_fd < 0) {
...
}
// berei die argumente voor vir die werker logika
WorkerArgs *args = malloc(sizeof(WorkerArgs));
if (!args) {
fprintf(stderr, "Failed to allocate memory for WorkerArgs\n");
close(client_fd);
continue;
}
args->client_fd = client_fd;
args->server = server;
// in die geval waar ons liggewig prosesse gebruik
if (server->use_pool && server->pool) {
// handig taak oor aan poel
if (thpool_add_work(server->pool, handle_client_task, args) != 0) {
fprintf(stderr, "ThreadPool queue full, dropping connection\n");
close(client_fd);
free(args);
}
} else {
// in die geval waar ons nie liggewig prosesse gebruiik nie
handle_client_task(args);
}
}
}
void server_cleanup(Server *server) {
if (server) {
...
// maak seker die poel word vernietig indien dit bestaan
if (server->pool) {
thpool_destroy(server->pool);
}
free(server);
}
}Sodra ‘n kliĂ«nt koppel (client_fd), word geheue dinamies toegeken vir WorkerArgs. Hierdie struktuur verpak (client_fd) en die bediener konteks (*server) sodat dit as ‘n enkele eenheid aan die werker funksie oorgedra kan word. Ons roep dan thpool_add_work met die poel, werker funksie, en argumente om die konneksie in die agtergrond te hanteer.
Ons behou die vorige funksionaliteit waar ons nie gebruik maak van liggewigprosesse nie as server->user_pool vals is of server->pool ‘n waarde van NULL het.
Nou kan ons ‘n aanpassing maak in die hoof funksie in ./src/main.c:
#include "server.h"
#include <stdio.h>
#include <signal.h>
#include "router.h"
#include "routes.h"
#include "thpool.h"
#include <stdlib.h>
...
int main(int argc, char *argv[]) {
// skep bediener
app_server = server_create();
// bespeur argument
int num_threads = (argc > 1) ? atoi(argv[1]) : 0;
if (num_threads > 0) {
// met liggewig prosess
app_server->use_pool = 1;
app_server->pool = thpool_init(num_threads);
} else {
// sonder liggewig prosess
app_server->use_pool = 0;
app_server->pool = NULL;
}
// konfigureer bediener
server_configure(app_server);
// registreer sein hanteerder
...
// voeg roetes by
...
// begin bediener
server_start(app_server);
// maak skoon
server_cleanup(app_server);
return 0;
}Nou kan mens die HTTP bediener begin op verskillende maniere, b.v.:
$ ./bin/server
Server listening on port 8080 (Single-threaded mode)...
$ ./bin/server 0
Server listening on port 8080 (Single-threaded mode)...
$ ./bin/server 16
Server listening on port 8080 (Multi-threaded mode: 16 threads)...
Verstelling deur HTTP koppelvlak
‘n Nuwe versoek hanteer funksie kan bygevoeg word om die poel tydens die verloop van die program te verstel. Voeg die volgende funksie by ./src/routes.h:
#ifndef ROUTES_H
#define ROUTES_H
#include "router.h"
...
char* handle_toggle_pool(int client_fd, HTTPRequest *req);
#endifHierdie funksie handle_toggle_pool kan soos volg implementeer word in ./src/routes.c:
...
char* handle_toggle_pool(int client_fd, HTTPRequest *req) {
char threads_str[10];
Server *s = req->server;
if (get_query_param(req->query, "threads", threads_str, sizeof(threads_str))) {
int num_threads = atoi(threads_str);
// stop en vernietig enige bestaande poel
if (s->pool) {
thpool_destroy(s->pool);
s->pool = NULL;
}
if (num_threads > 0) {
// initialiseer 'n nuwe poel met die nuwe konfigurasie
s->pool = thpool_init(num_threads);
s->use_pool = 1;
static char msg[128];
snprintf(msg, sizeof(msg), "{\"message\": \"ThreadPool reinitialized with %d threads\"}", num_threads);
return msg;
} else {
// indien 0, skakel oor na ou hanterings modus
s->use_pool = 0;
return "{\"message\": \"ThreadPool disabled. Running in single-threaded mode.\"}";
}
}
return "{\"error\": \"Missing 'threads' parameter. Example: /config/pool?threads=4\"}";
}
Onthou om die nuwe roete te registreer in ./src/main.c:
int main(int argc, char *argv[]) {
...
route_add(app_server, "/config/pool", handle_toggle_pool);
...
}
Volgende kan ons die handle_status hanteerder funksie soos volg implementeer in ./src/routes.c om die poel konfigurasie af te voer:
char* handle_status(int client_fd, HTTPRequest *req) {
static char status_json[512];
Server *s = req->server;
const char *mode = (s->use_pool && s->pool) ? "multi-threaded" : "single-threaded";
int pool_size = thpool_get_num_threads(s->pool);
int queued_tasks = thpool_get_queue_count(s->pool);
snprintf(status_json, sizeof(status_json),
"{"
"\"status\": \"running\","
"\"mode\": \"%s\","
"\"threadpool\": {"
"\"active\": %s,"
"\"threads\": %d,"
"\"queued_tasks\": %d"
"}"
"}",
mode,
(s->use_pool ? "true" : "false"),
pool_size,
queued_tasks
);
return status_json;
}
Op hierdie punt kan ons die bediener herkompileer deur make te loop. Nou kan ons die liggewigproses konfigurasie verstel terwyl die bediener loop. Hier volg 'n voorbeeld:
$ curl "http://localhost:8080/config/pool?threads=0"
{
"message": "ThreadPool disabled. Running in single-threaded mode."
}
$ curl "http://localhost:8080/api/status"
{
"status": "running",
"mode": "single-threaded",
"threadpool": {
"active": false,
"threads": 0,
"queued_tasks": 0
}
}
$ curl "http://localhost:8080/config/pool?threads=0"
{
"message": "ThreadPool reinitialized with 8 threads"
}
$ curl "http://localhost:8080/api/status"
{
"status": "running",
"mode": "multi-threaded",
"threadpool": {
"active": true,
"threads": 8,
"queued_tasks": 0
}
}
Poel Toetse
Hier volg ‘n toets waar ons die poel konfigureer en die bombardier program gebruik om ‘n klomp gelyktydige versoeke na die bediener te maak:
$ curl "http://localhost:8080/config/pool?threads=0"
{"message": "ThreadPool disabled. Running in single-threaded mode."}
$ bombardier http://localhost:8080/
Bombarding http://localhost:8080/ for 10s using 125 connection(s)
Done!
Statistics Avg Stdev Max
Reqs/sec 11227.95 2895.56 16601.69
Latency 11.95ms 134.91ms 5.03s
HTTP codes:
1xx - 0, 2xx - 112291, 3xx - 0, 4xx - 0, 5xx - 0
others - 402
Errors:
dial tcp 127.0.0.1:8080: i/o timeout - 402
Throughput: 1.09MB/s
$ curl "http://localhost:8080/config/pool?threads=16"
{"message": "ThreadPool reinitialized with 16 threads"}
$ bombardier http://localhost:8080/
Bombarding http://localhost:8080/ for 10s using 125 connection(s)
Done!
Statistics Avg Stdev Max
Reqs/sec 24422.66 2142.76 29178.74
Latency 5.33ms 74.66ms 2.72s
HTTP codes:
1xx - 0, 2xx - 244777, 3xx - 0, 4xx - 0, 5xx - 0
others - 47
Errors:
dial tcp 127.0.0.1:8080: i/o timeout - 47
Throughput: 3.03MB/s
In hierdie toets lopie kan 'n mens sien dat vir 'n enkele proses daar 11227 versoeke per sekonde hanteer word en waar dit liggewigproses poel verstel is na 16 liggewigprosesse dit 24422 versoeke per sekonde was. Daar is ook minder mislukte hanterings waar die poel gebruik was.
Opsomming
Ons het van liggewigprosesse gebruik gemaak om die bediener te optimeer sodat dit versoeke gelyktydig kan hanteer.
In die vorige artikel sal ons kyk na ‘n lĂȘerbediener implementasie.


Comments
Post a Comment