pyflink报错提示:findAndCreateTableSource failed 怎么解决

2021-01-26 10:50发布

尝试使用pyflink的tabl api去连接kafka。但提交时异常报错:pyflink.util.exception.TableException:findAndCreateTableSource...

尝试使用pyflink的tabl api去连接kafka。但提交时异常报错:pyflink.util.exception.TableException:findAndCreateTableSource failed

通过DDL建立源表的配置上应该是没有问题的


1条回答
给你三个亿
2楼 · 2021-02-23 10:24



在此记录pyflink运行过程中遇到的问题以及解决方法。让小伙伴们少走弯路。

Q1:Nomodulenamed'encodings'

Causedby:java.io.IOException:Failedtoexecutethecommand:venv.zip/venv/bin/python-cimportpyflink;importos;print(os.path.join(os.path.abspath(os.path.dirname(pyflink.__file__)),'bin'))

output:Couldnotfindplatformindependentlibraries

Couldnotfindplatformdependentlibraries

Considersetting$PYTHONHOMEto[:]

FatalPythonerror:initfsencoding:unabletoloadthefilesystemcodec

ModuleNotFoundError:Nomodulenamed'encodings'

问题分析:Python环境的问题,用virtualenv来管理虚拟环境时遇到。

问题解决:利用以下脚本(注意修改pyflink版本),使用miniconda3来管理虚拟环境即可解决。

wgethttps://ci.apache.org/projects/flink/flink-docs-release-1.11/downloads/setup-pyflink-virtual-env.shshsetup-pyflink-virtual-env.sh1.11.2

sourcevenv/bin/activate#激活虚拟环境

Q2:Nomodulenamed'encodings'

File"main.py",line2

SyntaxError:Non-ASCIIcharacter'\xe5'infilemain.pyonline3,butnoencodingdeclared;seehttp://www.python.org/peps/pep-0263.htmlfordetails

org.apache.flink.client.program.ProgramAbortException

问题分析:表面上是无法解析非ASCII码,实际上是因为当前的python版本错了。通过flinkrun来提交Python任务时,Flink会调用“python”命令,要求python的版本为3.5,3.6或者3.7中的一个。

问题解决:激活虚拟环境,使得运行python-V时显示的python版本为3.5,3.6或3.7

Q3:Couldnotfindanyfactoryforidentifier'kafka'

Causedby:org.apache.flink.table.api.ValidationException:Couldnotfindanyfactoryforidentifier'kafka'thatimplements'org.apache.flink.table.factories.DynamicTableSourceFactory'intheclasspath.org.apache.flink.client.program.ProgramAbortException

问题分析:没有指定kafkajar文件

问题解决:在flinkrun的时候,加入参数-jflink-sql-connector-kafka_2.11-1.11.2.jar,具体jar包的下载路径见ApacheKafkaSQLConnector,根据kafka版本选择jar包来下载。

Q4:Theparallelismmustbeapositivenumber:yarch

Theparallelismmustbeapositivenumber:yarch

问题分析:并行度要设置为正数

问题解决:在flinkrun的时候,加入参数-p1。但问题可能出在flink上。直接部署的flink没有问题,但是集成到CDH后,flink的配置参数发生了变化,导致无法以简单的flinkrun的方式来运行

Q5:Nomanifestfoundinjarfile'/xxxx/venv.zip'

org.apache.flink.client.program.ProgramInvocationException:Nomanifestfoundinjarfile'/xxxx/venv.zip'.Themanifestisneedtopointtotheprogram'smainclass.

问题分析:没有找到manifest文件来指定程序的mainclass。

问题解决:同Q4,可能是flink没有正确安装好。

Q6:Neithera'Main-Class',nora'program-class'entrywasfoundinthejarfile.

org.apache.flink.client.program.ProgramInvocationException:Neithera'Main-Class',nora'program-class'entrywasfoundinthejarfile.

问题分析:jar包里没有指定'Main-Class'或'program-class'

问题解决:同Q4,可能是flink没有正确安装好。

Q7:java.net.MalformedURLException:noprotocol:

py4j.protocol.Py4JJavaError:AnerroroccurredwhilecallingNone.java.net.URL.

:java.net.MalformedURLException:noprotocol:

问题分析:noprotocol,没有指定通信协议异常。

问题解决:看看报错所在的行,如果是URL,检查是否缺少http://;如果是路径,检查是否缺少file://。

Q8:MethodregisterFunction(...)doesnotexist

py4j.protocol.Py4JError:Anerroroccurredwhilecallingo4.registerFunction.Trace:

org.apache.flink.api.python.shaded.py4j.Py4JException:MethodregisterFunction([classjava.lang.String,classcom.flink.udf.TopN])doesnotexist

问题分析:registerFunction函数不存在,看源码registerFunction是TableEnvironment._j_env对象的方法,怀疑是_j_env没有正确定义。_j_env应该是指java运行的环境。

问题解决:在创建TableEnvironment的时候,再传入环境变量。下面举例说明。

原来的创建方式:

#流处理环境

frompyflink.tableimportStreamTableEnvironment,EnvironmentSettings


env_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()

t_env=StreamTableEnvironment.create(environment_settings=env_settings)

改正后的创建方式,注意create函数里的第一个入参为前面初始化好的env变量:

#流处理环境

frompyflink.datastreamimportStreamExecutionEnvironment

frompyflink.tableimportStreamTableEnvironment,EnvironmentSettings


env=StreamExecutionEnvironment.get_execution_environment()

env_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()

t_env=StreamTableEnvironment.create(env,environment_settings=env_settings)

对于批处理同理:

frompyflink.datasetimportExecutionEnvironment


env=ExecutionEnvironment.get_execution_environment()

...

Q9:Causedby:org.apache.flink.util.FlinkException:TheTaskExecutorisshuttingdown.

Causedby:org.apache.flink.util.FlinkException:TheTaskExecutorisshuttingdown.

atorg.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:404)

atorg.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:216)

问题分析:TaskExecutor没有在运行状态。可以运行jps查看是否有TaskManagerRunner。

问题解决:重启Flink。

Q11:Causedby:org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:java.lang.IllegalStateException:Processdiedwithexitcode0

org.apache.flink.runtime.JobException:RecoveryissuppressedbyNoRestartBackoffTimeStrategy

...

Causedby:java.lang.RuntimeException:Failedtocreatestagebundlefactory!INFO:root:Initializingpythonharness:xxxx/site-packages/pyflink/fn_execution/boot.py--id=1-1--logging_endpoint=localhost:58957--artifact_endpoint=localhost:58958--provision_endpoint=localhost:58959--control_endpoint=localhost:58956

...

raise_InactiveRpcError(state)

grpc._channel._InactiveRpcError:<_InactiveRpcErrorofRPCthatterminatedwith:

status=StatusCode.CANCELLED

details="ServersendMessage()failedwithError"

debug_error_string="{"created":"@1605690803.729786000","description":"Errorreceivedfrompeeripv6:[::1]:58959","file":"src/core/lib/surface/call.cc","file_line":1062,"grpc_message":"ServersendMessage()failedwithError","grpc_status":1}"

>

...

Causedby:org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:java.lang.IllegalStateException:Processdiedwithexitcode0

...

Causedby:java.lang.IllegalStateException:Processdiedwithexitcode0

...

问题分析:发送消息给RPC服务失败,具体原因不知道。

问题解决:重新提交Flink作业。



相关问题推荐

  • 回答 3

    换行。比如,print hello\nworld效果就是helloworld\n就是一个换行符。\是转义的意思,&#39;\n&#39;是换行,&#39;\t&#39;是tab,&#39;\\&#39;是,\ 是在编写程序中句子太长百,人为换行后加上\但print出来是一整行。...

  • 回答 42

    十种常见排序算法一般分为以下几种:(1)非线性时间比较类排序:a. 交换类排序(快速排序、冒泡排序)b. 插入类排序(简单插入排序、希尔排序)c. 选择类排序(简单选择排序、堆排序)d. 归并排序(二路归并排序、多路归并排序)(2)线性时间非比较类排序:...

  • 回答 70
    已采纳

    前景很好,中国正在产业升级,工业机器人和人工智能方面都会是强烈的热点,而且正好是在3~5年以后的时间。难度,肯定高,要求你有创新的思维能力,高数中的微积分、数列等等必须得非常好,软件编程(基础的应用最广泛的语言:C/C++)必须得很好,微电子(数字电...

  • 回答 28

    迭代器与生成器的区别:(1)生成器:生成器本质上就是一个函数,它记住了上一次返回时在函数体中的位置。对生成器函数的第二次(或第n次)调用,跳转到函数上一次挂起的位置。而且记录了程序执行的上下文。生成器不仅记住了它的数据状态,生成器还记住了程序...

  • 回答 9

    python中title( )属于python中字符串函数,返回’标题化‘的字符串,就是单词的开头为大写,其余为小写

  • 回答 6

    第一种解释:代码中的cnt是count的简称,一种电脑计算机内部的数学函数的名字,在Excel办公软件中计算参数列表中的数字项的个数;在数据库( sq| server或者access )中可以用来统计符合条件的数据条数。函数COUNT在计数时,将把数值型的数字计算进去;但是...

  • 回答 1

    head是方法,所以需要取小括号,即dataset.head()显示的则是前5行。data[:, :-1]和data[:, -1]。另外,如果想通过位置取数据,请使用iloc,即dataset.iloc[:, :-1]和dataset.iloc[:, -1],前者表示的是取所有行,但不包括最后一列的数据,结果是个DataFrame。...

  • Python入门简单吗2021-09-23 13:21
    回答 45

    挺简单的,其实课程内容没有我们想象的那么难、像我之前同学,完全零基础,培训了半年,直接出来就工作了,人家还在北京大公司上班,一个月15k,实力老厉害了

  • 回答 4

    Python针对众多的类型,提供了众多的内建函数来处理(内建是相对于导入import来说的,后面学习到包package时,将会介绍),这些内建函数功用在于其往往可对多种类型对象进行类似的操作,即多种类型对象的共有的操作;如果某种操作只对特殊的某一类对象可行,Pyt...

  • 回答 8

     相当于 ... 这里不是注释

  • 回答 4

    还有FIXME

  • 回答 3

    python的两个库:xlrd和xlutils。 xlrd打开excel,但是打开的excel并不能直接写入数据,需要用xlutils主要是复制一份出来,实现后续的写入功能。

  • 回答 8

    单行注释:Python中的单行注释一般是以#开头的,#右边的文字都会被当做解释说明的内容,不会被当做执行的程序。为了保证代码的可读性,一般会在#后面加一两个空格然后在编写解释内容。示例:#  单行注释print(hello world)注释可以放在代码上面也可以放在代...

  • 回答 2

    主要是按行读取,然后就是写出判断逻辑来勘测行是否为注视行,空行,编码行其他的:import linecachefile=open(&#39;3_2.txt&#39;,&#39;r&#39;)linecount=len(file.readlines())linecache.getline(&#39;3_2.txt&#39;,linecount)这样做的过程中发现一个问题,...

  • 回答 4

    或许是里面有没被注释的代码

  • 回答 26

    自学的话要看个人情况,可以先在B站找一下视频看一下

没有解决我的问题,去提问