NNG 通信模式

NNG 是 nanomsg 的继任版本,纯c语言开发,工作模式分为几种:

1,Pipeline (A One-Way Pipe)

单向通信,类似与生产者消费者模型的消息队列,消息从推方流向拉方。

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>

#include <nng/nng.h>
#include <nng/protocol/pipeline0/pull.h>
#include <nng/protocol/pipeline0/push.h>

#define NODE0 "node0"
#define NODE1 "node1"

void
fatal(const char *func, int rv)
{
        fprintf(stderr, "%s: %s
", func, nng_strerror(rv));
        exit(1);
}

int
node0(const char *url)
{
        nng_socket sock;
        int rv;

        if ((rv = nng_pull0_open(&sock)) != 0) {
                fatal("nng_pull0_open", rv);
        }
        if ((rv = nng_listen(sock, url, NULL, 0)) != 0) {
                fatal("nng_listen", rv);
        }
        for (;;) {
                char *buf = NULL;
                size_t sz;
                if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {
                        fatal("nng_recv", rv);
                }
                printf("NODE0: RECEIVED "%s"
", buf); 
                nng_free(buf, sz);
        }
}

int
node1(const char *url, char *msg)
{
        int sz_msg = strlen(msg) + 1; //  too
        nng_socket sock;
        int rv;
        int bytes;

        if ((rv = nng_push0_open(&sock)) != 0) {
                fatal("nng_push0_open", rv);
        }
        if ((rv = nng_dial(sock, url, NULL, 0)) != 0) {
                fatal("nng_dial", rv);
        }
        printf("NODE1: SENDING "%s"
", msg);
        if ((rv = nng_send(sock, msg, strlen(msg)+1, 0)) != 0) {
                fatal("nng_send", rv);
        }
        sleep(1); // wait for messages to flush before shutting down
        nng_close(sock);
        return (0);
}

int
main(int argc, char **argv)
{
        if ((argc > 1) && (strcmp(NODE0, argv[1]) == 0))
                return (node0(argv[2]));

        if ((argc > 2) && (strcmp(NODE1, argv[1]) == 0))
                return (node1(argv[2], argv[3]));

        fprintf(stderr, "Usage: pipeline %s|%s <URL> <ARG> ...
",
                NODE0, NODE1);
        return (1);
}

//gcc pipeline.c -lnng -o pipeline

2,Request/Reply (I ask, you answer)

请求-应答模式,如果不应答,请求方会一直请求。

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <time.h>

#include <nng/nng.h>
#include <nng/protocol/reqrep0/rep.h>
#include <nng/protocol/reqrep0/req.h>

#define NODE0 "node0"
#define NODE1 "node1"
#define DATE "DATE"

void
fatal(const char *func, int rv)
{
        fprintf(stderr, "%s: %s
", func, nng_strerror(rv));
        exit(1);
}

char *
date(void)
{
        time_t now = time(&now);
        struct tm *info = localtime(&now);
        char *text = asctime(info);
        text[strlen(text)-1] = ; // remove 

        return (text);
}

int
node0(const char *url)
{
        nng_socket sock;
        int rv;

        if ((rv = nng_rep0_open(&sock)) != 0) {
                fatal("nng_rep0_open", rv);
        }
          if ((rv = nng_listen(sock, url, NULL, 0)) != 0) {
                fatal("nng_listen", rv);
        }
        for (;;) {
                char *buf = NULL;
                size_t sz;
                if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {
                        fatal("nng_recv", rv);
                }
                if ((sz == (strlen(DATE) + 1)) && (strcmp(DATE, buf) == 0)) {
                        printf("NODE0: RECEIVED DATE REQUEST
");
                        char *d = date();
                        printf("NODE0: SENDING DATE %s
", d);
                        if ((rv = nng_send(sock, d, strlen(d) + 1, 0)) != 0) {
                                fatal("nng_send", rv);
                        }
                }
                nng_free(buf, sz);
        }
}

int
node1(const char *url)
{
        nng_socket sock;
        int rv;
        size_t sz;
        char *buf = NULL;

        if ((rv = nng_req0_open(&sock)) != 0) {
                fatal("nng_socket", rv);
        }
        if ((rv = nng_dial(sock, url, NULL, 0)) != 0) {
                fatal("nng_dial", rv);
        }
        printf("NODE1: SENDING DATE REQUEST %s
", DATE);
        if ((rv = nng_send(sock, DATE, strlen(DATE)+1, 0)) != 0) {
                fatal("nng_send", rv);
        }
        if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {
                fatal("nng_recv", rv);
        }
        printf("NODE1: RECEIVED DATE %s
", buf);  
        nng_free(buf, sz);
        nng_close(sock);
        return (0);
}

int
main(const int argc, const char **argv)
{
        if ((argc > 1) && (strcmp(NODE0, argv[1]) == 0))
                return (node0(argv[2]));

        if ((argc > 1) && (strcmp(NODE1, argv[1]) == 0))
                return (node1(argv[2]));

      fprintf(stderr, "Usage: reqrep %s|%s <URL> ...
", NODE0, NODE1);
      return (1);
}

3,Pair (Two Way Radio)

一对一双向通信,类似对讲机

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include <nng/nng.h>
#include <nng/protocol/pair0/pair.h>

#define NODE0 "node0"
#define NODE1 "node1"

void
fatal(const char *func, int rv)
{
        fprintf(stderr, "%s: %s
", func, nng_strerror(rv));
        exit(1);
}

int
send_name(nng_socket sock, char *name)
{
        int rv;
        printf("%s: SENDING "%s"
", name, name);
        if ((rv = nng_send(sock, name, strlen(name) + 1, 0)) != 0) {
                fatal("nng_send", rv);
        }
        return (rv);
}

int
recv_name(nng_socket sock, char *name)
{
        char *buf = NULL;
        int rv;
        size_t sz;
        if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) == 0) {
                printf("%s: RECEIVED "%s"
", name, buf); 
                nng_free(buf, sz);
        }
        return (rv);
}

int
send_recv(nng_socket sock, char *name)
{
        int rv;
        if ((rv = nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, 100)) != 0) {
                fatal("nng_setopt_ms", rv);
        }
        for (;;) {
                recv_name(sock, name);
                sleep(1);
                send_name(sock, name);
        }
}

int
node0(const char *url)
{
        nng_socket sock;
        int rv;
        if ((rv = nng_pair0_open(&sock)) != 0) {
                fatal("nng_pair0_open", rv);
        }
         if ((rv = nng_listen(sock, url, NULL, 0)) !=0) {
                fatal("nng_listen", rv);
        }
        return (send_recv(sock, NODE0));
}

int
node1(const char *url)
{
        nng_socket sock;
        int rv;
        sleep(1);
        if ((rv = nng_pair0_open(&sock)) != 0) {
                fatal("nng_pair0_open", rv);
        }
        if ((rv = nng_dial(sock, url, NULL, 0)) != 0) {
                fatal("nng_dial", rv);
        }
        return (send_recv(sock, NODE1));
}

int
main(int argc, char **argv)
{
        if ((argc > 1) && (strcmp(NODE0, argv[1]) == 0))
                return (node0(argv[2]));

        if ((argc > 1) && (strcmp(NODE1, argv[1]) == 0))
                return (node1(argv[2]));

        fprintf(stderr, "Usage: pair %s|%s <URL> <ARG> ...
", NODE0, NODE1);
        return 1;
}

4,Pub/Sub (Topics & Broadcast)

单向广播

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <time.h>
#include <unistd.h>

#include <nng/nng.h>
#include <nng/protocol/pubsub0/pub.h>
#include <nng/protocol/pubsub0/sub.h>

#define SERVER "server"
#define CLIENT "client"

void
fatal(const char *func, int rv)
{
        fprintf(stderr, "%s: %s
", func, nng_strerror(rv));
}

char *
date(void)
{
        time_t now = time(&now);
        struct tm *info = localtime(&now);
        char *text = asctime(info);
        text[strlen(text)-1] = ; // remove 

        return (text);
}

int
server(const char *url)
{
        nng_socket sock;
        int rv;

        if ((rv = nng_pub0_open(&sock)) != 0) {
                fatal("nng_pub0_open", rv);
        }
        if ((rv = nng_listen(sock, url, NULL, 0)) < 0) {
                fatal("nng_listen", rv);
        }
        for (;;) {
                char *d = date();
                printf("SERVER: PUBLISHING DATE %s
", d);
                if ((rv = nng_send(sock, d, strlen(d) + 1, 0)) != 0) {
                        fatal("nng_send", rv);
                }
                sleep(1);
        }
}

int
client(const char *url, const char *name)
{
        nng_socket sock;
        int rv;

        if ((rv = nng_sub0_open(&sock)) != 0) {
                fatal("nng_sub0_open", rv);
        }

        // subscribe to everything (empty means all topics)
        if ((rv = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0)) != 0) {
                fatal("nng_setopt", rv);
        }
        if ((rv = nng_dial(sock, url, NULL, 0)) != 0) {
                fatal("nng_dial", rv);
        }
        for (;;) {
                char *buf = NULL;
                size_t sz;
                if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {
                        fatal("nng_recv", rv);
                }
                printf("CLIENT (%s): RECEIVED %s
", name, buf); 
                nng_free(buf, sz);
        }
}

int
main(const int argc, const char **argv)
{
        if ((argc >= 2) && (strcmp(SERVER, argv[1]) == 0))
                return (server(argv[2]));

          if ((argc >= 3) && (strcmp(CLIENT, argv[1]) == 0))
                return (client (argv[2], argv[3]));

        fprintf(stderr, "Usage: pubsub %s|%s <URL> <ARG> ...
",
            SERVER, CLIENT);
        return 1;
}

5,Survey (Everybody Votes)

用于多节点表决或者服务发现

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>

#include <nng/nng.h>
#include <nng/protocol/survey0/survey.h>
#include <nng/protocol/survey0/respond.h>

#define SERVER "server"
#define CLIENT "client"
#define DATE   "DATE"

void
fatal(const char *func, int rv)
{
        fprintf(stderr, "%s: %s
", func, nng_strerror(rv));
        exit(1);
}

char *
date(void)
{
        time_t now = time(&now);
        struct tm *info = localtime(&now);
        char *text = asctime(info);
        text[strlen(text)-1] = ; // remove 

        return (text);
}

int
server(const char *url)
{
        nng_socket sock;
        int rv;

        if ((rv = nng_surveyor0_open(&sock)) != 0) {
                fatal("nng_surveyor0_open", rv);
        }
        if ((rv = nng_listen(sock, url, NULL, 0)) != 0) {
                fatal("nng_listen", rv);
        }
        for (;;) {
                printf("SERVER: SENDING DATE SURVEY REQUEST
");
                if ((rv = nng_send(sock, DATE, strlen(DATE) + 1, 0)) != 0) {
                        fatal("nng_send", rv);
                }

                for (;;) {
                        char *buf = NULL;
                        size_t sz;
                        rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC);
                        if (rv == NNG_ETIMEDOUT) {
                                break;
                        }
                        if (rv != 0) {
                                fatal("nng_recv", rv);
                        }
                        printf("SERVER: RECEIVED "%s" SURVEY RESPONSE
",
                            buf); 
                        nng_free(buf, sz);
                }

                printf("SERVER: SURVEY COMPLETE
");
        }
}

int
client(const char *url, const char *name)
{
        nng_socket sock;
        int rv;

        if ((rv = nng_respondent0_open(&sock)) != 0) {
                fatal("nng_respondent0_open", rv);
        }
        if ((rv = nng_dial(sock, url, NULL, NNG_FLAG_NONBLOCK)) != 0) {
                fatal("nng_dial", rv);
        }
        for (;;) {
                char *buf = NULL;
                size_t sz;
                if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) == 0) {
                        printf("CLIENT (%s): RECEIVED "%s" SURVEY REQUEST
",
                            name, buf); 
                        nng_free(buf, sz);
                        char *d = date();
                        printf("CLIENT (%s): SENDING DATE SURVEY RESPONSE
",
                           name);
                        if ((rv = nng_send(sock, d, strlen(d) + 1, 0)) != 0) {
                                fatal("nng_send", rv);
                        }
                }
        }
}

int
main(const int argc, const char **argv)
{
        if ((argc >= 2) && (strcmp(SERVER, argv[1]) == 0))
                return (server(argv[2]));

        if ((argc >= 3) && (strcmp(CLIENT, argv[1]) == 0))
                return (client(argv[2], argv[3]));

        fprintf(stderr, "Usage: survey %s|%s <URL> <ARG> ...
",
            SERVER, CLIENT);
        return 1;
}

6, Bus (Routing)

网状连接通信,每个加入节点都可以发送/接受广播消息

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include <nng/nng.h>
#include <nng/protocol/bus0/bus.h>

void
fatal(const char *func, int rv)
{
        fprintf(stderr, "%s: %s
", func, nng_strerror(rv));
        exit(1);
}

int
node(int argc, char **argv)
{
        nng_socket sock;
        int rv;
        size_t sz;

        if ((rv = nng_bus0_open(&sock)) != 0) {
                fatal("nng_bus0_open", rv);
        }
        if ((rv = nng_listen(sock, argv[2], NULL, 0)) != 0) {
                fatal("nng_listen", rv);
        }

        sleep(1); // wait for peers to bind
        if (argc >= 3) {
                for (int x = 3; x < argc; x++) {
                        if ((rv = nng_dial(sock, argv[x], NULL, 0)) != 0) {
                                fatal("nng_dial", rv);
                        }
                }
        }

        sleep(1); // wait for connects to establish

        // SEND
        sz = strlen(argv[1]) + 1; //  too
        printf("%s: SENDING %s ONTO BUS
", argv[1], argv[1]);
        if ((rv = nng_send(sock, argv[1], sz, 0)) != 0) {
                fatal("nng_send", rv);
        }

        // RECV
        for (;;) {
                char *buf = NULL;
                size_t sz;
                if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) !=0) {
                        if (rv == NNG_ETIMEDOUT) {
                                fatal("nng_recv", rv);
                        }
                }
                printf("%s: RECEIVED %s FROM BUS
", argv[1], buf); 
                nng_free(buf, sz);
        }
        nng_close(sock);
        return (0);
}

int
main(int argc, char **argv)
{
        if (argc >= 3) {
                return (node(argc, argv));
        }
        fprintf(stderr, "Usage: bus <NODE_NAME> <URL> <URL> ...
");
        return 1;
}
经验分享 程序员 微信小程序 职场和发展