Flume怎么接收udp组播、多播的数据?

2021-03-26 19:09发布

6条回答

1、使用recvfrom接收

SOCKET socket1;

//InitWinsock();

WSADATA wsaData;

int iErrorCode;

if (WSAStartup(MAKEWORD(2,1),&wsaData)) //调用Windows Sockets DLL

{

printf("Winsock无法初始化!\n");

WSACleanup();

return;

}

printf("服务器开始创建SOCKET。\n");

struct sockaddr_in server;

int len =sizeof(server);

server.sin_family=AF_INET;

server.sin_port=htons(1009); //server的监听端口

server.sin_addr.s_addr=inet_addr("192.168.1.101"); //server的地址

socket1=socket(AF_INET,SOCK_DGRAM,0);

FILE * a = fopen("c:\\123.txt","w");

unsigned char buffer[20*1024]="\0";

recvfrom(socket1,(char*)buffer,sizeof(buffer),0,(struct sockaddr*)&server,&len);

2、使用ffmpeg中的avformat_open_input函数打开udp


GetDlgItemText(IDC_EDIT3,url);

//GetDlgItem(IDC_EDIT3)->GetWindowText(url);

int port=1008;

url.Format(_T("%s:%d"),url,port);

avformat_open_input(&pFormatCtx,url,NULL,NULL);



猫的想法不敢猜
3楼 · 2021-03-30 17:33

1、使用recvfrom接收 

SOCKET socket1; 

//InitWinsock(); 

WSADATA wsaData; 

int iErrorCode; 

if (WSAStartup(MAKEWORD(2,1),&wsaData)) //调用Windows Sockets DLL 

printf("Winsock无法初始化!\n"); 

WSACleanup(); 

return; 

printf("服务器开始创建SOCKET。\n"); 

struct sockaddr_in server; 

int len =sizeof(server); 

server.sin_family=AF_INET; 

server.sin_port=htons(1009); //server的监听端口 

server.sin_addr.s_addr=inet_addr("192.168.1.101"); //server的地址 

socket1=socket(AF_INET,SOCK_DGRAM,0); 

FILE * a = fopen("c:\\123.txt","w"); 

unsigned char buffer[20*1024]="\0"; 

recvfrom(socket1,(char*)buffer,sizeof(buffer),0,(struct sockaddr*)&server,&len); 

2、使用ffmpeg中的avformat_open_input函数打开udp 


GetDlgItemText(IDC_EDIT3,url); 

//GetDlgItem(IDC_EDIT3)->GetWindowText(url); 

int port=1008; 

url.Format(_T("%s:%d"),url,port); 

avformat_open_input(&pFormatCtx,url,NULL,NULL); 

————————————————

版权声明:本文为CSDN博主「东成西就之小黄人」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://blog.csdn.net/panrong_pan/article/details/78481503

来源于网络仅供参考

小小123
4楼 · 2021-03-31 09:37

1、使用recvfrom接收 

SOCKET socket1; 

//InitWinsock(); 

WSADATA wsaData; 

int iErrorCode; 

if (WSAStartup(MAKEWORD(2,1),&wsaData)) //调用Windows Sockets DLL 

printf("Winsock无法初始化!\n"); 

WSACleanup(); 

return; 

printf("服务器开始创建SOCKET。\n"); 

struct sockaddr_in server; 

int len =sizeof(server); 

server.sin_family=AF_INET; 

server.sin_port=htons(1009); //server的监听端口 

server.sin_addr.s_addr=inet_addr("192.168.1.101"); //server的地址 

socket1=socket(AF_INET,SOCK_DGRAM,0); 

FILE * a = fopen("c:\\123.txt","w"); 

unsigned char buffer[20*1024]="\0"; 

recvfrom(socket1,(char*)buffer,sizeof(buffer),0,(struct sockaddr*)&server,&len); 

2、使用ffmpeg中的avformat_open_input函数打开udp 


GetDlgItemText(IDC_EDIT3,url); 

//GetDlgItem(IDC_EDIT3)->GetWindowText(url); 

int port=1008; 

url.Format(_T("%s:%d"),url,port); 

avformat_open_input(&pFormatCtx,url,NULL,NU


世界那么大我想去看看
5楼 · 2021-04-02 13:33

1、使用recvfrom接收 

SOCKET socket1; 

//InitWinsock(); 

WSADATA wsaData; 

int iErrorCode; 

if (WSAStartup(MAKEWORD(2,1),&wsaData)) //调用Windows Sockets DLL 

printf("Winsock无法初始化!\n"); 

WSACleanup(); 

return; 

printf("服务器开始创建SOCKET。\n"); 

struct sockaddr_in server; 

int len =sizeof(server); 

server.sin_family=AF_INET; 

server.sin_port=htons(1009); //server的监听端口 

server.sin_addr.s_addr=inet_addr("192.168.1.101"); //server的地址 

socket1=socket(AF_INET,SOCK_DGRAM,0); 

FILE * a = fopen("c:\\123.txt","w"); 

unsigned char buffer[20*1024]="\0"; 

recvfrom(socket1,(char*)buffer,sizeof(buffer),0,(struct sockaddr*)&server,&len); 

2、使用ffmpeg中的avformat_open_input函数打开udp 


GetDlgItemText(IDC_EDIT3,url); 

//GetDlgItem(IDC_EDIT3)->GetWindowText(url); 

int port=1008; 

url.Format(_T("%s:%d"),url,port); 

avformat_open_input(&pFormatCtx,url,NULL,NU


嘿呦嘿呦拔萝卜
6楼 · 2021-04-08 19:06

1、使用recvfrom接收

SOCKET socket1;

//InitWinsock();

WSADATA wsaData;

int iErrorCode;

if (WSAStartup(MAKEWORD(2,1),&wsaData)) //调用Windows Sockets DLL

{

printf("Winsock无法初始化!\n");

WSACleanup();

return;

}

printf("服务器开始创建SOCKET。\n");

struct sockaddr_in server;

int len =sizeof(server);

server.sin_family=AF_INET;

server.sin_port=htons(1009); //server的监听端口

server.sin_addr.s_addr=inet_addr("192.168.1.101"); //server的地址


腾腾家的宝贝
7楼 · 2021-09-20 14:33

网络中的一台主机如果希望能够接收到来自网络中其它主机发往某一个组播组的数据报,那么这么主机必须先加入该组播组,然后就可以从组地址接收数据包。在广域网中,还涉及到路由器支持组播路由等,但本文希望以一个最为简单的例子解释清楚协议栈关于组播的一个最为简单明了的工作过程,甚至,我们不希望涉及到 IGMP包。
    我们先从一个组播客户端的应用程序入手来解析组播的工作过程:
   

   #include
    #include
    #include
    #include
    #include "my_inet.h"
    #include
    #define MAXBUF 256
    #define PUERTO 5000
    #define GRUPO "224.0.1.1"
    int main(void)
    {
        int fd, n, r;
        struct sockaddr_in srv, cli;
        struct ip_mreq mreq;
        char buf[MAXBUF];
        memset( &srv, 0, sizeof(struct sockaddr_in) );
        memset( &cli, 0, sizeof(struct sockaddr_in) );
        memset( &mreq, 0, sizeof(struct ip_mreq) );
        srv.sin_family = MY_AF_INET;
        srv.sin_port = htons(PUERTO);
        if( inet_aton(GRUPO, &srv.sin_addr ) < 0>                 perror("inet_aton");
                return -1;
        }
        if( (fd = socket( MY_AF_INET, SOCK_DGRAM, MY_IPPROTO_UDP) ) < 0>             perror("socket");
            return -1;
        }
        if( bind(fd, (struct sockaddr *)&srv, sizeof(srv)) < 0>             perror("bind");
            return -1;
        }
        if (inet_aton(GRUPO, &mreq.imr_multiaddr) < 0>             perror("inet_aton");
            return -1;
        }
        inet_aton( "172.16.48.2", &(mreq.imr_interface) );
        if( setsockopt(fd, SOL_IP, IP_ADD_MEMBERSHIP, &mreq,sizeof(mreq)) < 0>             perror("setsockopt");
            return -1;
        }
        n = sizeof(cli);
        while(1){
            if( (r = recvfrom(fd, buf, MAXBUF, 0, (struct sockaddr *)&cli, (socklen_t*)&n)) < 0>                 perror("recvfrom");
            }else{
                buf[r] = 0;
                fprintf(stdout, "Mensaje desde %s: %s", inet_ntoa(cli.sin_addr), buf);
            }
        }
    }

    这是一个非常简单的组播客户端,它指定从组播组224.0.1.1的5000端口读数据,并显示在终端上,下面我们通过分析该程序来了解内核的工作过程。
    前面我们讲过,bind操作首先检查用户指定的端口是否可用,然后为socket的一些成员设置正确的值,并添加到哈希表myudp_hash中。然后,协议栈每次收到UDP数据,就会检查该数据报的源和目的地址,还有源和目的端口,在myudp_hash中找到匹配的socket,把该数据报放入该 socket的接收队列,以备用户读取。在这个程序中,bind操作把socket绑定到地址224.0.0.1:5000上, 该操作产生的直接结果就是,对于socket本身,下列值受影响:
    struct inet_sock{
        .rcv_saddr = 224.0.0.1;
        .saddr = 0.0.0.0;
        .sport = 5000;
        .daddr = 0.0.0.0;
        .dport = 0;
    }
    这五个数据表示,该套接字在发送数据包时,本地使用端口5000,本地可以使用任意一个网络设备接口,发往的目的地址不指定。在接收数据时,只接收发往IP地址224.0.0.1的端口为5000的数据。
    程序中,紧接着bind有一个setsockopt操作,它的作用是将socket加入一个组播组,因为socket要接收组播地址224.0.0.1的数据,它就必须加入该组播组。结构体struct ip_mreq mreq是该操作的参数,下面是其定义:
    struct ip_mreq
    {
        struct in_addr imr_multiaddr;   // 组播组的IP地址。
        struct in_addr imr_interface;   // 本地某一网络设备接口的IP地址。
    };
    一台主机上可能有多块网卡,接入多个不同的子网,imr_interface参数就是指定一个特定的设备接口,告诉协议栈只想在这个设备所在的子网中加入某个组播组。有了这两个参数,协议栈就能知道:在哪个网络设备接口上加入哪个组播组。为了简单起见,我们的程序中直接写明了IP地址:在 172.16.48.2所在的设备接口上加入组播组224.0.1.1。
    这个操作是在网络层上的一个选项,所以级别是SOL_IP,IP_ADD_MEMBERSHIP选项把用户传入的参数拷贝成了struct ip_mreqn结构体:
    struct ip_mreqn
    {
        struct in_addr  imr_multiaddr;
        struct in_addr  imr_address;
        int             imr_ifindex;
    };
    多了一个输入接口的索引,暂时被拷贝成零。
    该操作最终引发内核函数myip_mc_join_group执行加入组播组的操作。首先检查imr_multiaddr是否为合法的组播地址,然后根据 imr_interface的值找到对应的struct in_device结构。接下来就要为socket加入到组播组了,在inet_sock的结构体中有一个成员mc_list,它是一个结构体 struct ip_mc_socklist的链表,每一个节点代表socket当前正加入的一个组播组,该链表是有上限限制的,缺省值为 IP_MAX_MEMBERSHIPS(20),也就是说一个socket最多允许同时加入20个组播组。下面是struct ip_mc_socklist的定义:
    struct ip_mc_socklist
    {
        struct ip_mc_socklist   *next;
        struct ip_mreqn         multi;
        unsigned int            sfmode;     /* MCAST_{INCLUDE,EXCLUDE} */
        struct ip_sf_socklist   *sflist;
    };
    struct ip_sf_socklist
    {
        unsigned int    sl_max;
        unsigned int    sl_count;
        __u32           sl_addr[0];
    };
    除了multi成员,它还有一个源过滤机制。如果我们新添加的struct ip_mreqn已经存在于这个链表中(表示socket早就加入这个组播组了),那么不做任何事情,否则,创建一个新的struct ip_mc_socklist:
    struct ip_mc_socklist
    {
        .next = inet->mc_list;      //新节点放到链表头。
        .multi = 传入的参数;        //这是关键的组信息。
        .sfmode = MCAST_EXCLUDE;    //过滤掉sflist中的所有源。
        .sflist = NULL;             //没有源需要过滤。
    };


相关问题推荐

  • 回答 10

    创建test文件夹hadoop fs -mkdir /test

  • 回答 7

    Hadoop的三大核心组件分别是:1、HDFS(Hadoop Distribute File System):hadoop的数据存储工具。2、YARN(Yet Another Resource Negotiator,另一种资源协调者):Hadoop 的资源管理器。3、Hadoop MapReduce:分布式计算框架。HDFS是一个高度容错性的系统,适合部...

  • 回答 18

    hbase依靠HDFS来存储底层数据。Hadoop分布式文件系统(HDFS)为HBase提供了高可靠性的底层存储支持,HBase中的所有数据文件都存储在Hadoop HDFS文件系统上。

  • 回答 24

    HBase分布式数据库具有如下的显著特点:容量大:HBase分布式数据库中的表可以存储成千上万的行和列组成的数据。面向列:HBase是面向列的存储和权限控制,并支持独立检索。列存储,其数据在表中是按照某列存储的,根据数据动态的增加列,并且可以单独对列进行...

  • 回答 19

    解决问题的层面不一样首先,Hadoop和Apache Spark两者都是大数据框架,但是各自存在的目的不尽相同。Hadoop实质上更多是一个分布式数据基础设施: 它将巨大的数据集分派到一个由普通计算机组成的集群中的多个节点进行存储,意味着您不需要购买和维护昂贵的服务...

  • 回答 14

    1、HBase写快读慢,HBase的读取时长通常是几毫秒,而Redis的读取时长通常是几十微秒。性能相差非常大。2、HBase和Redis都支持KV类型。但是Redis支持List、Set等更丰富的类型。3、Redis支持的数据量通常受内存限制,而HBase没有这个限制,可以存储远超内存大小...

  • 回答 15

    列式存储格式是指以列为单位存储数据的数据存储格式,相比于传统的行式存储格式,它具有压缩比高、读I/O少(此处指可避免无意义的读I/O)等优点,目前被广泛应用于各种存储引擎中。对于HBase而言,它并不是一个列式存储引擎,而是列簇式存储引擎,即同一列簇中...

  • 回答 14

    一、简单理解Hadoop是一个大象:一个hadoop集群主要包含三个主要的模块:Mapreduce,hdfs,yarn。mapreduce是一个分离在合并的计算框架,注意他不是一个集群,而是一个编程框架。hdfs是一个分布式文件系统,是一个分布式集群,用于存放数据。yarn集群是负责集群...

  • 回答 12

    01 网络公开数据集02 数据报采集03 网络爬虫04 日志收集05 社会调查06 业务数据集07 埋点采集08 传感器采集09 数据交易平台10 个人数据收集

  • 回答 9

    1 Hadoop 各个目录的解释bin:Hadoop管理脚本和使用脚本所在目录, sbin目录下的脚本都是使用此目录下的脚本实现的。etc:Hadoop的所有配置文件所在的目录,所有hadoop的配置在etc/hadoop目录下include:对外提供的库的头文件lib :对外提供的动态编程库和静态...

  • 回答 4

    HDFS存储机制,包括HDFS的写入过程和读取过程两个部分: 1、写入过程:  1)客户端向namenode请求上传文件,namenode检查目标文件是否已存在,父目录是否存在。2)namenode返回是否可以上传。3)客户端请求第一个 block上传到哪几个datanode服务器上。4)nam...

  • Shuffle 发生在哪里?2021-04-28 20:11
    回答 4

    adoop核心:MapReduce原理。 MR的核心是shuffle,被称为奇迹发生的地方。 shuffle,弄乱,洗牌的意思。partition 分区,sort 排序,spill溢出,disk 磁盘下面是官方对shuffle的配图: phase 阶段,fetch 最终,merge 合并...

  • 回答 2

    Shuffle阶段分为两部分:Map端和Reduce端。一 map端shuffle过程;1-内存预排序:默认每个map有100M内存进行预排序(为了效率),超过阈值,会把内容写到磁盘;    此过程使用快速排序算法;2-根据key和reducer的数量进行分区和排序;首先根据数据所属的Parti...

  • 回答 3

    大数据时代需要1存储大量数据2快速的处理大量数据3从大量数据中进行分析 

  • Hadoop有哪几种模式?2021-04-27 20:20
    回答 3

    hadoop的四种模式。1、本地模式:本地模式就是解压源码包,不需要做任何的配置。通常用于开发调试,或者感受hadoop。2、伪分布模式:在学习当中一般都是使用这种模式,伪分布模式就是在一台机器的多个进程运行多个模块。虽然每一个模块都有相应的进程,但是却...

  • 回答 1

    进入和退出安全模式 [root@localhost bin]# ./hdfs dfsadmin -safemode enter15/08/03 07:26:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where ......

没有解决我的问题,去提问