flink+hive进行数据挖掘出现java.lang.IllegalArgumentException: The minBucketMemorySize is not valid!
项目背景使用flink(1.12.0)+hive(3.1)进行数据分析,使用windows10_x64(8GB内存)+idea进行开发,因为需要本地调试,且数据量是3W*3W左右的两个表,因此直接通过flink-client的MiniCluster在本地直接运行。问题描述将A、B两个表进行join后,按照其中4个字段进行group by并执行sum()和select()操作,这里会出现问题,错误堆
项目背景
使用flink(1.12.0)+hive(3.1)进行数据分析,使用windows10_x64(8GB内存)+idea进行开发,因为需要本地调试,且数据量是3W*3W左右的两个表,因此直接通过flink-client的MiniCluster在本地直接运行。
问题描述
将A、B两个表进行join后,按照其中4个字段进行group by并执行sum()和select()操作,这里会出现问题,错误堆栈如下
Exception in thread "main" java.lang.RuntimeException: Failed to fetch next result
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:77)
at org.apache.flink.table.planner.sinks.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:115)
at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:355)
at org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:155)
at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:149)
at com.wondersgroup.shrs.dwh.jobs.socialinsurance.dws.gs.GS_Report_WI4_Supplies.process(GS_Report_WI4_Supplies.java:127)
at com.wondersgroup.shrs.dwh.base.BaseJob.run(BaseJob.java:86)
at com.wondersgroup.shrs.dwh.base.AdhocJob.run(AdhocJob.java:18)
at com.wondersgroup.shrs.dwh.base.Executor.execute(Executor.java:85)
at com.wondersgroup.shrs.dwh.base.Executor.execute(Executor.java:69)
at com.wondersgroup.shrs.dwh.base.cmd.RunAdHocJob.main(RunAdHocJob.java:31)
Caused by: java.io.IOException: Failed to fetch job execution result
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175)
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:126)
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:103)
... 11 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:172)
... 13 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:996)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by: java.lang.IllegalArgumentException: The minBucketMemorySize is not valid!
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:142)
at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:189)
at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:147)
at HashAggregateWithKeys$245.open(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)
Suppressed: java.lang.NullPointerException
at HashAggregateWithKeys$245.close(Unknown Source)
at org.apache.flink.table.runtime.operators.TableStreamOperator.dispose(TableStreamOperator.java:46)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:740)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:720)
at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:643)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:552)
... 3 more
Suppressed: java.lang.NullPointerException
at org.apache.flink.table.runtime.operators.join.HashJoinOperator.close(HashJoinOperator.java:213)
at org.apache.flink.table.runtime.operators.TableStreamOperator.dispose(TableStreamOperator.java:46)
at org.apache.flink.table.runtime.operators.multipleinput.MultipleInputStreamOperatorBase.dispose(MultipleInputStreamOperatorBase.java:162)
... 7 more
问题排查过程记录
HashAggregateWithKeys在哪里?
根据错误堆栈,首先定位到错误出现的原因发生在org.apache.flink.table.runtime.operators.aggregate.BytesHashMap的构造函数中
错误原因就是memorySize没有超过1mb。
那么我们需要排查memorySize是如何传入的,需要看HashAggregateWithKeys中的源码,但是通过idea全局扫描整个工程和jar文件,都没有找到这个类,这是为什么???
答:HashAggregateWithKeys的源码是通过org.apache.flink.table.planner.codegen.agg.batch.HashAggCodeGenerator用字符串拼接而来的(what the fuck!!!!!),拼接过程中有这么一段代码:
private[flink] def prepareHashAggMap(
ctx: CodeGeneratorContext,
groupKeyTypesTerm: String,
aggBufferTypesTerm: String,
aggregateMapTerm: String): Unit = {
// create aggregate map
val mapTypeTerm = classOf[BytesHashMap].getName
ctx.addReusableMember(s"private transient $mapTypeTerm $aggregateMapTerm;")
ctx.addReusableOpenStatement(s"$aggregateMapTerm " +
s"= new $mapTypeTerm(" +
s"this.getContainingTask()," +
s"this.getContainingTask().getEnvironment().getMemoryManager()," +
s"computeMemorySize()," +
s" $groupKeyTypesTerm," +
s" $aggBufferTypesTerm);")
// close aggregate map and release memory segments
ctx.addReusableCloseStatement(s"$aggregateMapTerm.free();")
ctx.addReusableCloseStatement(s"")
}
可以看到,在这里构造了一个BytesHashMap对象出来,其中memorySize是通过字符串中指定的“computeMemorySize()”方法实现的。
computeMemorySize方法在哪?
由于字符串拼接的code很难阅读,因此这里依然借助idea的全文检索功能,查找文件中包含"computeMemorySize()"关键字,很幸运,搜索后直接就可以定位到了,方法的地址:org.apache.flink.table.runtime.operators.TableStreamOperator#computeMemorySize,源码如下:
/**
* Compute memory size from memory faction.
*/
public long computeMemorySize() {
final Environment environment = getContainingTask().getEnvironment();
return environment.getMemoryManager().computeMemorySize(
getOperatorConfig().getManagedMemoryFractionOperatorUseCaseOfSlot(
ManagedMemoryUseCase.BATCH_OP,
environment.getTaskManagerInfo().getConfiguration(),
environment.getUserCodeClassLoader().asClassLoader()));
}
/**
* Computes the memory size corresponding to the fraction of all memory governed by this MemoryManager.
*
* @param fraction The fraction of all memory governed by this MemoryManager
* @return The memory size corresponding to the memory fraction
*/
public long computeMemorySize(double fraction) {
validateFraction(fraction);
return (long) Math.floor(memoryBudget.getTotalMemorySize() * fraction);
}
从这里可以发现,memorySize受到两个变量的影响,totalMemorySize和fraction。
在多次debug和调试中发现,fraction的值根据group by中key的数量不同会发生变化,造成这个变化的原因目前没有找到(如果有哪位同行知道的话,还请指教一下);因此我们的解决方案就只剩下一种,增大totalMemorySize。
如何增大totalMemorySize?
要增大totalMemorySize,就要通过flink官方提供的参数进行修改,这里我们要分析出具体要修改那一部分内存,下面引用官方的原话:
这里基本就确定了,要想办法增大taskmanger.memory.managed.size,通过debug跟踪,这个变量的默认值=128mb。
设置Managed Memory
flink设置变量的方式是通过Configuration设置,例如:
但是:在使用flink-client的MiniCluster运行时,是无法通过这个方式设置的。
通过分析TableEnvironment.create(EnvironmentSettings)的源码,可以看到在创建Executor的时候,会创建一个新的的StreamExecutionEnvironment,其中会使用一个全新的Configuration,作为开发人员,对这个StreamExecutionEnvironment的实例,只有一个能够修改其中数据的方法:
protected修饰,因此我们可以通过继承的方式来获取到initailizeContextEnvironment方法的访问权限。
public class Test2 extends StreamExecutionEnvironment {
public static void initial(Configuration configuration) {
initializeContextEnvironment(new EnvFactory(configuration));
}
private static class EnvFactory implements StreamExecutionEnvironmentFactory {
private Configuration commonConfig;
EnvFactory(Configuration configuration) {
this.commonConfig = configuration;
}
@Override
public StreamExecutionEnvironment createExecutionEnvironment(Configuration configuration) {
this.commonConfig.addAll(configuration);
return StreamExecutionEnvironment.createLocalEnvironment(this.commonConfig);
}
}
}
此时,在构造TableEnvironment前,就可以通过Test2来设置我们需要的参数了。
Configuration configuration = new Configuration();
configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.ofMebiBytes(256));
Test2.initial(configuration);
TableEnvironment tableEnv = TableEnvironment.create(createBatchSetting());
解决方案总结
1、(针对MiniCluster,如果使用flink cluster运行,通过设置conf/flink.yaml文件设置)编写代码用来向MiniCluster中设置我们需要的内存控制参数,源码如下:
public class Test2 extends StreamExecutionEnvironment {
public static void initial(Configuration configuration) {
initializeContextEnvironment(new EnvFactory(configuration));
}
private static class EnvFactory implements StreamExecutionEnvironmentFactory {
private Configuration commonConfig;
EnvFactory(Configuration configuration) {
this.commonConfig = configuration;
}
@Override
public StreamExecutionEnvironment createExecutionEnvironment(Configuration configuration) {
this.commonConfig.addAll(configuration);
return StreamExecutionEnvironment.createLocalEnvironment(this.commonConfig);
}
}
}
2、设置内存变量,由于本篇文章中的异常是由group by操作引起,因此需要修改shuffle相关的内存。
Configuration configuration = new Configuration();
configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.ofMebiBytes(256));
Test2.initial(configuration);
TableEnvironment tableEnv = TableEnvironment.create(createBatchSetting());
更多推荐
所有评论(0)