digraph G {
0 [labelType="html" label="<br><b>CollectLimit</b><br><br>"];
subgraph cluster1 {
isCluster="true";
label="WholeStageCodegen (5)\n \nduration: total (min, med, max (stageId: taskId))\n7 ms (0 ms, 0 ms, 3 ms (stage 109.0: task 1321))";
2 [labelType="html" label="<br><b>Project</b><br><br>"];
3 [labelType="html" label="<b>SortMergeJoin</b><br><br>number of output rows: 12"];
}
subgraph cluster4 {
isCluster="true";
label="WholeStageCodegen (2)\n \nduration: total (min, med, max (stageId: taskId))\n125 ms (2 ms, 5 ms, 41 ms (stage 81.0: task 1316))";
5 [labelType="html" label="<b>Sort</b><br><br>sort time total (min, med, max (stageId: taskId))<br>0 ms (0 ms, 0 ms, 0 ms (stage 109.0: task 1323))<br>peak memory total (min, med, max (stageId: taskId))<br>96.5 MiB (64.0 KiB, 16.1 MiB, 16.1 MiB (stage 109.0: task 1321))<br>spill size total (min, med, max (stageId: taskId))<br>0.0 B (0.0 B, 0.0 B, 0.0 B (stage 109.0: task 1323))"];
}
6 [labelType="html" label="<b>Exchange</b><br><br>shuffle records written: 265<br>shuffle write time total (min, med, max (stageId: taskId))<br>301 ms (5 ms, 13 ms, 50 ms (stage 68.0: task 1297))<br>records read: 12<br>local bytes read total (min, med, max (stageId: taskId))<br>1118.0 B (0.0 B, 141.0 B, 276.0 B (stage 109.0: task 1321))<br>fetch wait time total (min, med, max (stageId: taskId))<br>0 ms (0 ms, 0 ms, 0 ms (stage 109.0: task 1321))<br>remote bytes read total (min, med, max (stageId: taskId))<br>437.0 B (0.0 B, 0.0 B, 292.0 B (stage 109.0: task 1321))<br>local blocks read: 8<br>remote blocks read: 3<br>data size total (min, med, max (stageId: taskId))<br>28.4 KiB (1680.0 B, 1760.0 B, 2.7 KiB (stage 68.0: task 1299))<br>shuffle bytes written total (min, med, max (stageId: taskId))<br>35.1 KiB (2.0 KiB, 2.1 KiB, 3.2 KiB (stage 68.0: task 1299))"];
subgraph cluster7 {
isCluster="true";
label="WholeStageCodegen (1)\n \nduration: total (min, med, max (stageId: taskId))\n767 ms (29 ms, 40 ms, 72 ms (stage 68.0: task 1297))";
8 [labelType="html" label="<br><b>Project</b><br><br>"];
9 [labelType="html" label="<b>Filter</b><br><br>number of output rows: 265"];
10 [labelType="html" label="<b>Scan ExistingRDD</b><br><br>number of output rows: 265"];
}
subgraph cluster11 {
isCluster="true";
label="WholeStageCodegen (4)\n \nduration: total (min, med, max (stageId: taskId))\n54 ms (0 ms, 1 ms, 26 ms (stage 81.0: task 1316))";
12 [labelType="html" label="<b>Sort</b><br><br>sort time total (min, med, max (stageId: taskId))<br>0 ms (0 ms, 0 ms, 0 ms (stage 109.0: task 1321))<br>peak memory total (min, med, max (stageId: taskId))<br>96.4 MiB (0.0 B, 16.1 MiB, 16.1 MiB (stage 109.0: task 1321))<br>spill size total (min, med, max (stageId: taskId))<br>0.0 B (0.0 B, 0.0 B, 0.0 B (stage 109.0: task 1321))"];
}
13 [labelType="html" label="<b>Exchange</b><br><br>shuffle records written: 265<br>shuffle write time total (min, med, max (stageId: taskId))<br>141 ms (4 ms, 9 ms, 11 ms (stage 80.0: task 1308))<br>records read: 12<br>local bytes read total (min, med, max (stageId: taskId))<br>606.0 B (0.0 B, 101.0 B, 303.0 B (stage 109.0: task 1321))<br>fetch wait time total (min, med, max (stageId: taskId))<br>0 ms (0 ms, 0 ms, 0 ms (stage 109.0: task 1321))<br>remote bytes read total (min, med, max (stageId: taskId))<br>606.0 B (0.0 B, 101.0 B, 202.0 B (stage 95.0: task 1317))<br>local blocks read: 6<br>remote blocks read: 6<br>data size total (min, med, max (stageId: taskId))<br>22.1 KiB (1248.0 B, 1416.0 B, 1496.0 B (stage 80.0: task 1300))<br>shuffle bytes written total (min, med, max (stageId: taskId))<br>24.7 KiB (1417.0 B, 1610.0 B, 1692.0 B (stage 80.0: task 1304))"];
subgraph cluster14 {
isCluster="true";
label="WholeStageCodegen (3)\n \nduration: total (min, med, max (stageId: taskId))\n284 ms (13 ms, 18 ms, 20 ms (stage 80.0: task 1314))";
15 [labelType="html" label="<br><b>Project</b><br><br>"];
16 [labelType="html" label="<br><b>SerializeFromObject</b><br><br>"];
}
17 [labelType="html" label="<b>Scan</b><br><br>number of output rows: 265"];
2->0;
3->2;
5->3;
6->5;
8->6;
9->8;
10->9;
12->3;
13->12;
15->13;
16->15;
17->16;
}
18
CollectLimit 11
Project [concat(cast(attr#323.id as string), ->1) AS id_to_1#405, cast(UDF(graphx_attr#308.distances)[1] as string) AS shortest_distance#406]
SortMergeJoin [new_id#325L], [new_id#303L], Inner
WholeStageCodegen (5)
Sort [new_id#325L ASC NULLS FIRST], false, 0
WholeStageCodegen (2)
Exchange hashpartitioning(new_id#325L, 200), true, [id=#436]
Project [cast(id#105 as bigint) AS new_id#325L, struct(id, id#105, Borough, Borough#106, Zone, Zone#107, service_zone, service_zone#108) AS attr#323]
Filter isnotnull(cast(id#105 as bigint))
Scan ExistingRDD[id#105,Borough#106,Zone#107,service_zone#108]
WholeStageCodegen (1)
Sort [new_id#303L ASC NULLS FIRST], false, 0
WholeStageCodegen (4)
Exchange hashpartitioning(new_id#303L, 200), true, [id=#445]
Project [struct(distances, _2#300._1) AS graphx_attr#308, _1#299L AS new_id#303L]
SerializeFromObject [knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#299L, if (isnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)) null else named_struct(_1, mapobjects(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, -1), if (isnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, -1))) null else named_struct(_1, knownnotnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, -1))._1, _2, knownnotnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, -1))._2), knownnotnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)._1, None)) AS _2#300]
WholeStageCodegen (3)
Scan[obj#298]
== Parsed Logical Plan ==
GlobalLimit 11
+- LocalLimit 11
+- Project [cast(id_to_1#399 as string) AS id_to_1#405, cast(shortest_distance#400 as string) AS shortest_distance#406]
+- Project [concat(cast(id#339 as string), ->1) AS id_to_1#399, distances#393[cast(1 as bigint)] AS shortest_distance#400]
+- Project [id#339, Borough#340, Zone#341, service_zone#342, UDF(distances#343) AS distances#393]
+- Project [attr#323.id AS id#339, attr#323.Borough AS Borough#340, attr#323.Zone AS Zone#341, attr#323.service_zone AS service_zone#342, graphx_attr#308.distances AS distances#343]
+- Project [attr#323, graphx_attr#308]
+- Project [new_id#325L, attr#323, graphx_attr#308]
+- Join Inner, (new_id#325L = new_id#303L)
:- Project [new_id#325L, attr#323]
: +- Project [cast(attr#323.id as bigint) AS new_id#325L, attr#323.id AS id#326, attr#323]
: +- Project [struct(id, id#105, Borough, Borough#106, Zone, Zone#107, service_zone, service_zone#108) AS attr#323]
: +- LogicalRDD [id#105, Borough#106, Zone#107, service_zone#108], false
+- Project [struct(distances, graphx_attr#304._1) AS graphx_attr#308, new_id#303L]
+- Project [_1#299L AS new_id#303L, _2#300 AS graphx_attr#304]
+- SerializeFromObject [knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#299L, if (isnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)) null else named_struct(_1, mapobjects(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, 1), if (isnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, 1))) null else named_struct(_1, knownnotnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, 1))._1, _2, knownnotnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, 1))._2), knownnotnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)._1, None)) AS _2#300]
+- ExternalRDD [obj#298]
== Analyzed Logical Plan ==
id_to_1: string, shortest_distance: string
GlobalLimit 11
+- LocalLimit 11
+- Project [cast(id_to_1#399 as string) AS id_to_1#405, cast(shortest_distance#400 as string) AS shortest_distance#406]
+- Project [concat(cast(id#339 as string), ->1) AS id_to_1#399, distances#393[cast(1 as bigint)] AS shortest_distance#400]
+- Project [id#339, Borough#340, Zone#341, service_zone#342, UDF(distances#343) AS distances#393]
+- Project [attr#323.id AS id#339, attr#323.Borough AS Borough#340, attr#323.Zone AS Zone#341, attr#323.service_zone AS service_zone#342, graphx_attr#308.distances AS distances#343]
+- Project [attr#323, graphx_attr#308]
+- Project [new_id#325L, attr#323, graphx_attr#308]
+- Join Inner, (new_id#325L = new_id#303L)
:- Project [new_id#325L, attr#323]
: +- Project [cast(attr#323.id as bigint) AS new_id#325L, attr#323.id AS id#326, attr#323]
: +- Project [struct(id, id#105, Borough, Borough#106, Zone, Zone#107, service_zone, service_zone#108) AS attr#323]
: +- LogicalRDD [id#105, Borough#106, Zone#107, service_zone#108], false
+- Project [struct(distances, graphx_attr#304._1) AS graphx_attr#308, new_id#303L]
+- Project [_1#299L AS new_id#303L, _2#300 AS graphx_attr#304]
+- SerializeFromObject [knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#299L, if (isnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)) null else named_struct(_1, mapobjects(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, 1), if (isnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, 1))) null else named_struct(_1, knownnotnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, 1))._1, _2, knownnotnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, 1))._2), knownnotnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)._1, None)) AS _2#300]
+- ExternalRDD [obj#298]
== Optimized Logical Plan ==
GlobalLimit 11
+- LocalLimit 11
+- Project [concat(cast(attr#323.id as string), ->1) AS id_to_1#405, cast(UDF(graphx_attr#308.distances)[1] as string) AS shortest_distance#406]
+- Join Inner, (new_id#325L = new_id#303L)
:- Project [cast(id#105 as bigint) AS new_id#325L, struct(id, id#105, Borough, Borough#106, Zone, Zone#107, service_zone, service_zone#108) AS attr#323]
: +- Filter isnotnull(cast(id#105 as bigint))
: +- LogicalRDD [id#105, Borough#106, Zone#107, service_zone#108], false
+- Project [struct(distances, _2#300._1) AS graphx_attr#308, _1#299L AS new_id#303L]
+- SerializeFromObject [knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#299L, if (isnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)) null else named_struct(_1, mapobjects(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, -1), if (isnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, -1))) null else named_struct(_1, knownnotnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, -1))._1, _2, knownnotnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, -1))._2), knownnotnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)._1, None)) AS _2#300]
+- ExternalRDD [obj#298]
== Physical Plan ==
CollectLimit 11
+- *(5) Project [concat(cast(attr#323.id as string), ->1) AS id_to_1#405, cast(UDF(graphx_attr#308.distances)[1] as string) AS shortest_distance#406]
+- *(5) SortMergeJoin [new_id#325L], [new_id#303L], Inner
:- *(2) Sort [new_id#325L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(new_id#325L, 200), true, [id=#436]
: +- *(1) Project [cast(id#105 as bigint) AS new_id#325L, struct(id, id#105, Borough, Borough#106, Zone, Zone#107, service_zone, service_zone#108) AS attr#323]
: +- *(1) Filter isnotnull(cast(id#105 as bigint))
: +- *(1) Scan ExistingRDD[id#105,Borough#106,Zone#107,service_zone#108]
+- *(4) Sort [new_id#303L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(new_id#303L, 200), true, [id=#445]
+- *(3) Project [struct(distances, _2#300._1) AS graphx_attr#308, _1#299L AS new_id#303L]
+- *(3) SerializeFromObject [knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#299L, if (isnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)) null else named_struct(_1, mapobjects(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, -1), if (isnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, -1))) null else named_struct(_1, knownnotnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, -1))._1, _2, knownnotnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, -1))._2), knownnotnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)._1, None)) AS _2#300]
+- Scan[obj#298]