digraph G {
0 [labelType="html" label="<br><b>TakeOrderedAndProject</b><br><br>"];
subgraph cluster1 {
isCluster="true";
label="WholeStageCodegen (5)\n \nduration: total (min, med, max (stageId: taskId))\n109 ms (0 ms, 0 ms, 102 ms (stage 2153.0: task 1073))";
2 [labelType="html" label="<br><b>Project</b><br><br>"];
3 [labelType="html" label="<b>SortMergeJoin</b><br><br>number of output rows: 265"];
}
subgraph cluster4 {
isCluster="true";
label="WholeStageCodegen (2)\n \nduration: total (min, med, max (stageId: taskId))\n532 ms (0 ms, 2 ms, 130 ms (stage 2153.0: task 1073))";
5 [labelType="html" label="<b>Sort</b><br><br>sort time total (min, med, max (stageId: taskId))<br>0 ms (0 ms, 0 ms, 0 ms (stage 2153.0: task 1074))<br>peak memory total (min, med, max (stageId: taskId))<br>9.3 GiB (64.0 KiB, 64.1 MiB, 64.1 MiB (stage 2153.0: task 1073))<br>spill size total (min, med, max (stageId: taskId))<br>0.0 B (0.0 B, 0.0 B, 0.0 B (stage 2153.0: task 1074))"];
}
6 [labelType="html" label="<b>Exchange</b><br><br>shuffle records written: 265<br>shuffle write time: 9 ms<br>records read: 265<br>local bytes read total (min, med, max (stageId: taskId))<br>27.3 KiB (0.0 B, 142.0 B, 457.0 B (stage 2153.0: task 1177))<br>fetch wait time total (min, med, max (stageId: taskId))<br>0 ms (0 ms, 0 ms, 0 ms (stage 2153.0: task 1073))<br>local blocks read: 148<br>data size: 28.4 KiB<br>shuffle bytes written: 27.3 KiB"];
subgraph cluster7 {
isCluster="true";
label="WholeStageCodegen (1)\n \nduration: 73 ms";
8 [labelType="html" label="<br><b>Project</b><br><br>"];
9 [labelType="html" label="<b>Filter</b><br><br>number of output rows: 265"];
10 [labelType="html" label="<b>Scan ExistingRDD</b><br><br>number of output rows: 265"];
}
subgraph cluster11 {
isCluster="true";
label="WholeStageCodegen (4)\n \nduration: total (min, med, max (stageId: taskId))\n76 ms (0 ms, 0 ms, 62 ms (stage 2153.0: task 1073))";
12 [labelType="html" label="<b>Sort</b><br><br>sort time total (min, med, max (stageId: taskId))<br>0 ms (0 ms, 0 ms, 0 ms (stage 2153.0: task 1073))<br>peak memory total (min, med, max (stageId: taskId))<br>9.3 GiB (0.0 B, 64.1 MiB, 64.1 MiB (stage 2153.0: task 1073))<br>spill size total (min, med, max (stageId: taskId))<br>0.0 B (0.0 B, 0.0 B, 0.0 B (stage 2153.0: task 1073))"];
}
13 [labelType="html" label="<b>Exchange</b><br><br>shuffle records written: 265<br>shuffle write time: 9 ms<br>records read: 265<br>fetch wait time total (min, med, max (stageId: taskId))<br>3 ms (0 ms, 0 ms, 2 ms (stage 2153.0: task 1104))<br>remote bytes read total (min, med, max (stageId: taskId))<br>12.8 KiB (0.0 B, 74.0 B, 166.0 B (stage 2153.0: task 1177))<br>remote blocks read: 148<br>data size: 10.4 KiB<br>shuffle bytes written: 12.8 KiB"];
subgraph cluster14 {
isCluster="true";
label="WholeStageCodegen (3)\n \nduration: 14 ms";
15 [labelType="html" label="<br><b>Project</b><br><br>"];
16 [labelType="html" label="<br><b>SerializeFromObject</b><br><br>"];
}
17 [labelType="html" label="<b>Scan</b><br><br>number of output rows: 265"];
2->0;
3->2;
5->3;
6->5;
8->6;
9->8;
10->9;
12->3;
13->12;
15->13;
16->15;
17->16;
}
18
TakeOrderedAndProject(limit=6, orderBy=[pagerank#499 DESC NULLS LAST], output=[id#551,pagerank#552])
Project [attr#306.id AS id#495, graphx_attr#462.pagerank AS pagerank#499]
SortMergeJoin [new_id#308L], [new_id#457L], Inner
WholeStageCodegen (5)
Sort [new_id#308L ASC NULLS FIRST], false, 0
WholeStageCodegen (2)
Exchange hashpartitioning(new_id#308L, 200), true, [id=#528]
Project [cast(id#105 as bigint) AS new_id#308L, struct(id, id#105, Borough, Borough#106, Zone, Zone#107, service_zone, service_zone#108) AS attr#306]
Filter isnotnull(cast(id#105 as bigint))
Scan ExistingRDD[id#105,Borough#106,Zone#107,service_zone#108]
WholeStageCodegen (1)
Sort [new_id#457L ASC NULLS FIRST], false, 0
WholeStageCodegen (4)
Exchange hashpartitioning(new_id#457L, 200), true, [id=#537]
Project [struct(pagerank, _2#454._1) AS graphx_attr#462, _1#453L AS new_id#457L]
SerializeFromObject [knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#453L, if (isnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)) null else named_struct(_1, knownnotnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)._1) AS _2#454]
WholeStageCodegen (3)
Scan[obj#452]
== Parsed Logical Plan ==
GlobalLimit 6
+- LocalLimit 6
+- Project [cast(id#495 as string) AS id#551, cast(pagerank#499 as string) AS pagerank#552]
+- Sort [pagerank#499 DESC NULLS LAST], true
+- Project [id#495, pagerank#499]
+- Project [attr#306.id AS id#495, attr#306.Borough AS Borough#496, attr#306.Zone AS Zone#497, attr#306.service_zone AS service_zone#498, graphx_attr#462.pagerank AS pagerank#499]
+- Project [attr#306, graphx_attr#462]
+- Project [new_id#308L, attr#306, graphx_attr#462]
+- Join Inner, (new_id#308L = new_id#457L)
:- Project [new_id#308L, attr#306]
: +- Project [cast(attr#306.id as bigint) AS new_id#308L, attr#306.id AS id#309, attr#306]
: +- Project [struct(id, id#105, Borough, Borough#106, Zone, Zone#107, service_zone, service_zone#108) AS attr#306]
: +- LogicalRDD [id#105, Borough#106, Zone#107, service_zone#108], false
+- Project [struct(pagerank, graphx_attr#458._1) AS graphx_attr#462, new_id#457L]
+- Project [_1#453L AS new_id#457L, _2#454 AS graphx_attr#458]
+- SerializeFromObject [knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#453L, if (isnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)) null else named_struct(_1, knownnotnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)._1) AS _2#454]
+- ExternalRDD [obj#452]
== Analyzed Logical Plan ==
id: string, pagerank: string
GlobalLimit 6
+- LocalLimit 6
+- Project [cast(id#495 as string) AS id#551, cast(pagerank#499 as string) AS pagerank#552]
+- Sort [pagerank#499 DESC NULLS LAST], true
+- Project [id#495, pagerank#499]
+- Project [attr#306.id AS id#495, attr#306.Borough AS Borough#496, attr#306.Zone AS Zone#497, attr#306.service_zone AS service_zone#498, graphx_attr#462.pagerank AS pagerank#499]
+- Project [attr#306, graphx_attr#462]
+- Project [new_id#308L, attr#306, graphx_attr#462]
+- Join Inner, (new_id#308L = new_id#457L)
:- Project [new_id#308L, attr#306]
: +- Project [cast(attr#306.id as bigint) AS new_id#308L, attr#306.id AS id#309, attr#306]
: +- Project [struct(id, id#105, Borough, Borough#106, Zone, Zone#107, service_zone, service_zone#108) AS attr#306]
: +- LogicalRDD [id#105, Borough#106, Zone#107, service_zone#108], false
+- Project [struct(pagerank, graphx_attr#458._1) AS graphx_attr#462, new_id#457L]
+- Project [_1#453L AS new_id#457L, _2#454 AS graphx_attr#458]
+- SerializeFromObject [knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#453L, if (isnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)) null else named_struct(_1, knownnotnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)._1) AS _2#454]
+- ExternalRDD [obj#452]
== Optimized Logical Plan ==
GlobalLimit 6
+- LocalLimit 6
+- Project [cast(id#495 as string) AS id#551, cast(pagerank#499 as string) AS pagerank#552]
+- Sort [pagerank#499 DESC NULLS LAST], true
+- Project [attr#306.id AS id#495, graphx_attr#462.pagerank AS pagerank#499]
+- Join Inner, (new_id#308L = new_id#457L)
:- Project [cast(id#105 as bigint) AS new_id#308L, struct(id, id#105, Borough, Borough#106, Zone, Zone#107, service_zone, service_zone#108) AS attr#306]
: +- Filter isnotnull(cast(id#105 as bigint))
: +- LogicalRDD [id#105, Borough#106, Zone#107, service_zone#108], false
+- Project [struct(pagerank, _2#454._1) AS graphx_attr#462, _1#453L AS new_id#457L]
+- SerializeFromObject [knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#453L, if (isnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)) null else named_struct(_1, knownnotnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)._1) AS _2#454]
+- ExternalRDD [obj#452]
== Physical Plan ==
TakeOrderedAndProject(limit=6, orderBy=[pagerank#499 DESC NULLS LAST], output=[id#551,pagerank#552])
+- *(5) Project [attr#306.id AS id#495, graphx_attr#462.pagerank AS pagerank#499]
+- *(5) SortMergeJoin [new_id#308L], [new_id#457L], Inner
:- *(2) Sort [new_id#308L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(new_id#308L, 200), true, [id=#528]
: +- *(1) Project [cast(id#105 as bigint) AS new_id#308L, struct(id, id#105, Borough, Borough#106, Zone, Zone#107, service_zone, service_zone#108) AS attr#306]
: +- *(1) Filter isnotnull(cast(id#105 as bigint))
: +- *(1) Scan ExistingRDD[id#105,Borough#106,Zone#107,service_zone#108]
+- *(4) Sort [new_id#457L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(new_id#457L, 200), true, [id=#537]
+- *(3) Project [struct(pagerank, _2#454._1) AS graphx_attr#462, _1#453L AS new_id#457L]
+- *(3) SerializeFromObject [knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#453L, if (isnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)) null else named_struct(_1, knownnotnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)._1) AS _2#454]
+- Scan[obj#452]