2021-01-26 10:50发布
尝试使用pyflink的tabl api去连接kafka。但提交时异常报错:pyflink.util.exception.TableException:findAndCreateTableSource... 显示全部
尝试使用pyflink的tabl api去连接kafka。但提交时异常报错:pyflink.util.exception.TableException:findAndCreateTableSource failed
通过DDL建立源表的配置上应该是没有问题的
在此记录pyflink运行过程中遇到的问题以及解决方法。让小伙伴们少走弯路。
Q1:Nomodulenamed'encodings'
Causedby:java.io.IOException:Failedtoexecutethecommand:venv.zip/venv/bin/python-cimportpyflink;importos;print(os.path.join(os.path.abspath(os.path.dirname(pyflink.__file__)),'bin'))
output:Couldnotfindplatformindependentlibraries
Couldnotfindplatformdependentlibraries
Considersetting$PYTHONHOMEto[:]
FatalPythonerror:initfsencoding:unabletoloadthefilesystemcodec
ModuleNotFoundError:Nomodulenamed'encodings'
问题分析:Python环境的问题,用virtualenv来管理虚拟环境时遇到。
问题解决:利用以下脚本(注意修改pyflink版本),使用miniconda3来管理虚拟环境即可解决。
wgethttps://ci.apache.org/projects/flink/flink-docs-release-1.11/downloads/setup-pyflink-virtual-env.shshsetup-pyflink-virtual-env.sh1.11.2
sourcevenv/bin/activate#激活虚拟环境
Q2:Nomodulenamed'encodings'
File"main.py",line2
SyntaxError:Non-ASCIIcharacter'\xe5'infilemain.pyonline3,butnoencodingdeclared;seehttp://www.python.org/peps/pep-0263.htmlfordetails
org.apache.flink.client.program.ProgramAbortException
问题分析:表面上是无法解析非ASCII码,实际上是因为当前的python版本错了。通过flinkrun来提交Python任务时,Flink会调用“python”命令,要求python的版本为3.5,3.6或者3.7中的一个。
问题解决:激活虚拟环境,使得运行python-V时显示的python版本为3.5,3.6或3.7
Q3:Couldnotfindanyfactoryforidentifier'kafka'
Causedby:org.apache.flink.table.api.ValidationException:Couldnotfindanyfactoryforidentifier'kafka'thatimplements'org.apache.flink.table.factories.DynamicTableSourceFactory'intheclasspath.org.apache.flink.client.program.ProgramAbortException
问题分析:没有指定kafkajar文件
问题解决:在flinkrun的时候,加入参数-jflink-sql-connector-kafka_2.11-1.11.2.jar,具体jar包的下载路径见ApacheKafkaSQLConnector,根据kafka版本选择jar包来下载。
Q4:Theparallelismmustbeapositivenumber:yarch
Theparallelismmustbeapositivenumber:yarch
问题分析:并行度要设置为正数
问题解决:在flinkrun的时候,加入参数-p1。但问题可能出在flink上。直接部署的flink没有问题,但是集成到CDH后,flink的配置参数发生了变化,导致无法以简单的flinkrun的方式来运行
Q5:Nomanifestfoundinjarfile'/xxxx/venv.zip'
org.apache.flink.client.program.ProgramInvocationException:Nomanifestfoundinjarfile'/xxxx/venv.zip'.Themanifestisneedtopointtotheprogram'smainclass.
问题分析:没有找到manifest文件来指定程序的mainclass。
问题解决:同Q4,可能是flink没有正确安装好。
Q6:Neithera'Main-Class',nora'program-class'entrywasfoundinthejarfile.
org.apache.flink.client.program.ProgramInvocationException:Neithera'Main-Class',nora'program-class'entrywasfoundinthejarfile.
问题分析:jar包里没有指定'Main-Class'或'program-class'
Q7:java.net.MalformedURLException:noprotocol:
py4j.protocol.Py4JJavaError:AnerroroccurredwhilecallingNone.java.net.URL.
:java.net.MalformedURLException:noprotocol:
问题分析:noprotocol,没有指定通信协议异常。
问题解决:看看报错所在的行,如果是URL,检查是否缺少http://;如果是路径,检查是否缺少file://。
Q8:MethodregisterFunction(...)doesnotexist
py4j.protocol.Py4JError:Anerroroccurredwhilecallingo4.registerFunction.Trace:
org.apache.flink.api.python.shaded.py4j.Py4JException:MethodregisterFunction([classjava.lang.String,classcom.flink.udf.TopN])doesnotexist
问题分析:registerFunction函数不存在,看源码registerFunction是TableEnvironment._j_env对象的方法,怀疑是_j_env没有正确定义。_j_env应该是指java运行的环境。
问题解决:在创建TableEnvironment的时候,再传入环境变量。下面举例说明。
原来的创建方式:
#流处理环境
frompyflink.tableimportStreamTableEnvironment,EnvironmentSettings
env_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env=StreamTableEnvironment.create(environment_settings=env_settings)
改正后的创建方式,注意create函数里的第一个入参为前面初始化好的env变量:
frompyflink.datastreamimportStreamExecutionEnvironment
env=StreamExecutionEnvironment.get_execution_environment()
t_env=StreamTableEnvironment.create(env,environment_settings=env_settings)
对于批处理同理:
frompyflink.datasetimportExecutionEnvironment
env=ExecutionEnvironment.get_execution_environment()
...
Q9:Causedby:org.apache.flink.util.FlinkException:TheTaskExecutorisshuttingdown.
Causedby:org.apache.flink.util.FlinkException:TheTaskExecutorisshuttingdown.
atorg.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:404)
atorg.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:216)
问题分析:TaskExecutor没有在运行状态。可以运行jps查看是否有TaskManagerRunner。
问题解决:重启Flink。
Q11:Causedby:org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:java.lang.IllegalStateException:Processdiedwithexitcode0
org.apache.flink.runtime.JobException:RecoveryissuppressedbyNoRestartBackoffTimeStrategy
Causedby:java.lang.RuntimeException:Failedtocreatestagebundlefactory!INFO:root:Initializingpythonharness:xxxx/site-packages/pyflink/fn_execution/boot.py--id=1-1--logging_endpoint=localhost:58957--artifact_endpoint=localhost:58958--provision_endpoint=localhost:58959--control_endpoint=localhost:58956
raise_InactiveRpcError(state)
grpc._channel._InactiveRpcError:<_InactiveRpcErrorofRPCthatterminatedwith:
status=StatusCode.CANCELLED
details="ServersendMessage()failedwithError"
debug_error_string="{"created":"@1605690803.729786000","description":"Errorreceivedfrompeeripv6:[::1]:58959","file":"src/core/lib/surface/call.cc","file_line":1062,"grpc_message":"ServersendMessage()failedwithError","grpc_status":1}"
>
Causedby:org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:java.lang.IllegalStateException:Processdiedwithexitcode0
Causedby:java.lang.IllegalStateException:Processdiedwithexitcode0
问题分析:发送消息给RPC服务失败,具体原因不知道。
问题解决:重新提交Flink作业。
换行。比如,print hello\nworld效果就是helloworld\n就是一个换行符。\是转义的意思,'\n'是换行,'\t'是tab,'\\'是,\ 是在编写程序中句子太长百,人为换行后加上\但print出来是一整行。...
十种常见排序算法一般分为以下几种:(1)非线性时间比较类排序:a. 交换类排序(快速排序、冒泡排序)b. 插入类排序(简单插入排序、希尔排序)c. 选择类排序(简单选择排序、堆排序)d. 归并排序(二路归并排序、多路归并排序)(2)线性时间非比较类排序:...
前景很好,中国正在产业升级,工业机器人和人工智能方面都会是强烈的热点,而且正好是在3~5年以后的时间。难度,肯定高,要求你有创新的思维能力,高数中的微积分、数列等等必须得非常好,软件编程(基础的应用最广泛的语言:C/C++)必须得很好,微电子(数字电...
迭代器与生成器的区别:(1)生成器:生成器本质上就是一个函数,它记住了上一次返回时在函数体中的位置。对生成器函数的第二次(或第n次)调用,跳转到函数上一次挂起的位置。而且记录了程序执行的上下文。生成器不仅记住了它的数据状态,生成器还记住了程序...
python中title( )属于python中字符串函数,返回’标题化‘的字符串,就是单词的开头为大写,其余为小写
第一种解释:代码中的cnt是count的简称,一种电脑计算机内部的数学函数的名字,在Excel办公软件中计算参数列表中的数字项的个数;在数据库( sq| server或者access )中可以用来统计符合条件的数据条数。函数COUNT在计数时,将把数值型的数字计算进去;但是...
head是方法,所以需要取小括号,即dataset.head()显示的则是前5行。data[:, :-1]和data[:, -1]。另外,如果想通过位置取数据,请使用iloc,即dataset.iloc[:, :-1]和dataset.iloc[:, -1],前者表示的是取所有行,但不包括最后一列的数据,结果是个DataFrame。...
挺简单的,其实课程内容没有我们想象的那么难、像我之前同学,完全零基础,培训了半年,直接出来就工作了,人家还在北京大公司上班,一个月15k,实力老厉害了
Python针对众多的类型,提供了众多的内建函数来处理(内建是相对于导入import来说的,后面学习到包package时,将会介绍),这些内建函数功用在于其往往可对多种类型对象进行类似的操作,即多种类型对象的共有的操作;如果某种操作只对特殊的某一类对象可行,Pyt...
相当于 ... 这里不是注释
还有FIXME
python的两个库:xlrd和xlutils。 xlrd打开excel,但是打开的excel并不能直接写入数据,需要用xlutils主要是复制一份出来,实现后续的写入功能。
单行注释:Python中的单行注释一般是以#开头的,#右边的文字都会被当做解释说明的内容,不会被当做执行的程序。为了保证代码的可读性,一般会在#后面加一两个空格然后在编写解释内容。示例:# 单行注释print(hello world)注释可以放在代码上面也可以放在代...
主要是按行读取,然后就是写出判断逻辑来勘测行是否为注视行,空行,编码行其他的:import linecachefile=open('3_2.txt','r')linecount=len(file.readlines())linecache.getline('3_2.txt',linecount)这样做的过程中发现一个问题,...
或许是里面有没被注释的代码
自学的话要看个人情况,可以先在B站找一下视频看一下
最多设置5个标签!
在此记录pyflink运行过程中遇到的问题以及解决方法。让小伙伴们少走弯路。
Q1:Nomodulenamed'encodings'
Causedby:java.io.IOException:Failedtoexecutethecommand:venv.zip/venv/bin/python-cimportpyflink;importos;print(os.path.join(os.path.abspath(os.path.dirname(pyflink.__file__)),'bin'))
output:Couldnotfindplatformindependentlibraries
Couldnotfindplatformdependentlibraries
Considersetting$PYTHONHOMEto[:]
FatalPythonerror:initfsencoding:unabletoloadthefilesystemcodec
ModuleNotFoundError:Nomodulenamed'encodings'
问题分析:Python环境的问题,用virtualenv来管理虚拟环境时遇到。
问题解决:利用以下脚本(注意修改pyflink版本),使用miniconda3来管理虚拟环境即可解决。
wgethttps://ci.apache.org/projects/flink/flink-docs-release-1.11/downloads/setup-pyflink-virtual-env.shshsetup-pyflink-virtual-env.sh1.11.2
sourcevenv/bin/activate#激活虚拟环境
Q2:Nomodulenamed'encodings'
File"main.py",line2
SyntaxError:Non-ASCIIcharacter'\xe5'infilemain.pyonline3,butnoencodingdeclared;seehttp://www.python.org/peps/pep-0263.htmlfordetails
org.apache.flink.client.program.ProgramAbortException
问题分析:表面上是无法解析非ASCII码,实际上是因为当前的python版本错了。通过flinkrun来提交Python任务时,Flink会调用“python”命令,要求python的版本为3.5,3.6或者3.7中的一个。
问题解决:激活虚拟环境,使得运行python-V时显示的python版本为3.5,3.6或3.7
Q3:Couldnotfindanyfactoryforidentifier'kafka'
Causedby:org.apache.flink.table.api.ValidationException:Couldnotfindanyfactoryforidentifier'kafka'thatimplements'org.apache.flink.table.factories.DynamicTableSourceFactory'intheclasspath.org.apache.flink.client.program.ProgramAbortException
问题分析:没有指定kafkajar文件
问题解决:在flinkrun的时候,加入参数-jflink-sql-connector-kafka_2.11-1.11.2.jar,具体jar包的下载路径见ApacheKafkaSQLConnector,根据kafka版本选择jar包来下载。
Q4:Theparallelismmustbeapositivenumber:yarch
Theparallelismmustbeapositivenumber:yarch
问题分析:并行度要设置为正数
问题解决:在flinkrun的时候,加入参数-p1。但问题可能出在flink上。直接部署的flink没有问题,但是集成到CDH后,flink的配置参数发生了变化,导致无法以简单的flinkrun的方式来运行
Q5:Nomanifestfoundinjarfile'/xxxx/venv.zip'
org.apache.flink.client.program.ProgramInvocationException:Nomanifestfoundinjarfile'/xxxx/venv.zip'.Themanifestisneedtopointtotheprogram'smainclass.
问题分析:没有找到manifest文件来指定程序的mainclass。
问题解决:同Q4,可能是flink没有正确安装好。
Q6:Neithera'Main-Class',nora'program-class'entrywasfoundinthejarfile.
org.apache.flink.client.program.ProgramInvocationException:Neithera'Main-Class',nora'program-class'entrywasfoundinthejarfile.
问题分析:jar包里没有指定'Main-Class'或'program-class'
问题解决:同Q4,可能是flink没有正确安装好。
Q7:java.net.MalformedURLException:noprotocol:
py4j.protocol.Py4JJavaError:AnerroroccurredwhilecallingNone.java.net.URL.
:java.net.MalformedURLException:noprotocol:
问题分析:noprotocol,没有指定通信协议异常。
问题解决:看看报错所在的行,如果是URL,检查是否缺少http://;如果是路径,检查是否缺少file://。
Q8:MethodregisterFunction(...)doesnotexist
py4j.protocol.Py4JError:Anerroroccurredwhilecallingo4.registerFunction.Trace:
org.apache.flink.api.python.shaded.py4j.Py4JException:MethodregisterFunction([classjava.lang.String,classcom.flink.udf.TopN])doesnotexist
问题分析:registerFunction函数不存在,看源码registerFunction是TableEnvironment._j_env对象的方法,怀疑是_j_env没有正确定义。_j_env应该是指java运行的环境。
问题解决:在创建TableEnvironment的时候,再传入环境变量。下面举例说明。
原来的创建方式:
#流处理环境
frompyflink.tableimportStreamTableEnvironment,EnvironmentSettings
env_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env=StreamTableEnvironment.create(environment_settings=env_settings)
改正后的创建方式,注意create函数里的第一个入参为前面初始化好的env变量:
#流处理环境
frompyflink.datastreamimportStreamExecutionEnvironment
frompyflink.tableimportStreamTableEnvironment,EnvironmentSettings
env=StreamExecutionEnvironment.get_execution_environment()
env_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env=StreamTableEnvironment.create(env,environment_settings=env_settings)
对于批处理同理:
frompyflink.datasetimportExecutionEnvironment
env=ExecutionEnvironment.get_execution_environment()
...
Q9:Causedby:org.apache.flink.util.FlinkException:TheTaskExecutorisshuttingdown.
Causedby:org.apache.flink.util.FlinkException:TheTaskExecutorisshuttingdown.
atorg.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:404)
atorg.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:216)
问题分析:TaskExecutor没有在运行状态。可以运行jps查看是否有TaskManagerRunner。
问题解决:重启Flink。
Q11:Causedby:org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:java.lang.IllegalStateException:Processdiedwithexitcode0
org.apache.flink.runtime.JobException:RecoveryissuppressedbyNoRestartBackoffTimeStrategy
...
Causedby:java.lang.RuntimeException:Failedtocreatestagebundlefactory!INFO:root:Initializingpythonharness:xxxx/site-packages/pyflink/fn_execution/boot.py--id=1-1--logging_endpoint=localhost:58957--artifact_endpoint=localhost:58958--provision_endpoint=localhost:58959--control_endpoint=localhost:58956
...
raise_InactiveRpcError(state)
grpc._channel._InactiveRpcError:<_InactiveRpcErrorofRPCthatterminatedwith:
status=StatusCode.CANCELLED
details="ServersendMessage()failedwithError"
debug_error_string="{"created":"@1605690803.729786000","description":"Errorreceivedfrompeeripv6:[::1]:58959","file":"src/core/lib/surface/call.cc","file_line":1062,"grpc_message":"ServersendMessage()failedwithError","grpc_status":1}"
>
...
Causedby:org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:java.lang.IllegalStateException:Processdiedwithexitcode0
...
Causedby:java.lang.IllegalStateException:Processdiedwithexitcode0
...
问题分析:发送消息给RPC服务失败,具体原因不知道。
问题解决:重新提交Flink作业。
相关问题推荐
换行。比如,print hello\nworld效果就是helloworld\n就是一个换行符。\是转义的意思,'\n'是换行,'\t'是tab,'\\'是,\ 是在编写程序中句子太长百,人为换行后加上\但print出来是一整行。...
十种常见排序算法一般分为以下几种:(1)非线性时间比较类排序:a. 交换类排序(快速排序、冒泡排序)b. 插入类排序(简单插入排序、希尔排序)c. 选择类排序(简单选择排序、堆排序)d. 归并排序(二路归并排序、多路归并排序)(2)线性时间非比较类排序:...
前景很好,中国正在产业升级,工业机器人和人工智能方面都会是强烈的热点,而且正好是在3~5年以后的时间。难度,肯定高,要求你有创新的思维能力,高数中的微积分、数列等等必须得非常好,软件编程(基础的应用最广泛的语言:C/C++)必须得很好,微电子(数字电...
迭代器与生成器的区别:(1)生成器:生成器本质上就是一个函数,它记住了上一次返回时在函数体中的位置。对生成器函数的第二次(或第n次)调用,跳转到函数上一次挂起的位置。而且记录了程序执行的上下文。生成器不仅记住了它的数据状态,生成器还记住了程序...
python中title( )属于python中字符串函数,返回’标题化‘的字符串,就是单词的开头为大写,其余为小写
第一种解释:代码中的cnt是count的简称,一种电脑计算机内部的数学函数的名字,在Excel办公软件中计算参数列表中的数字项的个数;在数据库( sq| server或者access )中可以用来统计符合条件的数据条数。函数COUNT在计数时,将把数值型的数字计算进去;但是...
head是方法,所以需要取小括号,即dataset.head()显示的则是前5行。data[:, :-1]和data[:, -1]。另外,如果想通过位置取数据,请使用iloc,即dataset.iloc[:, :-1]和dataset.iloc[:, -1],前者表示的是取所有行,但不包括最后一列的数据,结果是个DataFrame。...
挺简单的,其实课程内容没有我们想象的那么难、像我之前同学,完全零基础,培训了半年,直接出来就工作了,人家还在北京大公司上班,一个月15k,实力老厉害了
Python针对众多的类型,提供了众多的内建函数来处理(内建是相对于导入import来说的,后面学习到包package时,将会介绍),这些内建函数功用在于其往往可对多种类型对象进行类似的操作,即多种类型对象的共有的操作;如果某种操作只对特殊的某一类对象可行,Pyt...
相当于 ... 这里不是注释
还有FIXME
python的两个库:xlrd和xlutils。 xlrd打开excel,但是打开的excel并不能直接写入数据,需要用xlutils主要是复制一份出来,实现后续的写入功能。
单行注释:Python中的单行注释一般是以#开头的,#右边的文字都会被当做解释说明的内容,不会被当做执行的程序。为了保证代码的可读性,一般会在#后面加一两个空格然后在编写解释内容。示例:# 单行注释print(hello world)注释可以放在代码上面也可以放在代...
主要是按行读取,然后就是写出判断逻辑来勘测行是否为注视行,空行,编码行其他的:import linecachefile=open('3_2.txt','r')linecount=len(file.readlines())linecache.getline('3_2.txt',linecount)这样做的过程中发现一个问题,...
或许是里面有没被注释的代码
自学的话要看个人情况,可以先在B站找一下视频看一下