digraph G {
0 [labelType="html" label="<br><b>CollectLimit</b><br><br>"];
subgraph cluster1 {
isCluster="true";
label="WholeStageCodegen (2)";
2 [labelType="html" label="<br><b>Project</b><br><br>"];
3 [labelType="html" label="<b>BroadcastHashJoin</b><br><br>number of output rows: 21"];
9 [labelType="html" label="<br><b>Project</b><br><br>"];
10 [labelType="html" label="<br><b>SerializeFromObject</b><br><br>"];
}
4 [labelType="html" label="<b>BroadcastExchange</b><br><br>data size: 1026.1 KiB<br>time to collect: 142 ms<br>time to build: 7 ms<br>time to broadcast: 3 ms"];
subgraph cluster5 {
isCluster="true";
label="WholeStageCodegen (1)\n \nduration: 49 ms";
6 [labelType="html" label="<br><b>Project</b><br><br>"];
7 [labelType="html" label="<b>Filter</b><br><br>number of output rows: 265"];
}
8 [labelType="html" label="<b>Scan csv </b><br><br>number of files read: 1<br>metadata time: 0 ms<br>size of files read: 10.5 KiB<br>number of output rows: 265"];
11 [labelType="html" label="<b>Scan</b><br><br>number of output rows: 21"];
2->0;
3->2;
4->3;
6->4;
7->6;
8->7;
9->3;
10->9;
11->10;
}
12
CollectLimit 21
Project [cast(attr#257.id as string) AS id#411, cast(UDF(graphx_attr#300.distances) as string) AS distances#412]
BroadcastHashJoin [new_id#259L], [new_id#295L], Inner, BuildLeft
Project [struct(distances, _2#292._1) AS graphx_attr#300, _1#291L AS new_id#295L]
SerializeFromObject [knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#291L, if (isnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)) null else named_struct(_1, mapobjects(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, -1), if (isnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, -1))) null else named_struct(_1, knownnotnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, -1))._1, _2, knownnotnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, -1))._2), knownnotnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)._1, None)) AS _2#292]
WholeStageCodegen (2)
BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#423]
Project [cast(LocationID#72 as bigint) AS new_id#259L, struct(id, LocationID#72, Borough, Borough#73, Zone, Zone#74, service_zone, service_zone#75) AS attr#257]
Filter isnotnull(cast(LocationID#72 as bigint))
WholeStageCodegen (1)
FileScan csv [LocationID#72,Borough#73,Zone#74,service_zone#75] Batched: false, DataFilters: [isnotnull(cast(LocationID#72 as bigint))], Format: CSV, Location: InMemoryFileIndex[s3a://data-repository-bkt/ECS765/nyc_taxi/taxi_zone_lookup.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<LocationID:int,Borough:string,Zone:string,service_zone:string>
Scan[obj#290]
== Parsed Logical Plan ==
GlobalLimit 21
+- LocalLimit 21
+- Project [cast(id#322 as string) AS id#411, cast(distances#376 as string) AS distances#412]
+- Project [id#322, distances#376]
+- Project [id#322, Borough#323, Zone#324, service_zone#325, UDF(distances#326) AS distances#376]
+- Project [attr#257.id AS id#322, attr#257.Borough AS Borough#323, attr#257.Zone AS Zone#324, attr#257.service_zone AS service_zone#325, graphx_attr#300.distances AS distances#326]
+- Project [attr#257, graphx_attr#300]
+- Project [new_id#259L, attr#257, graphx_attr#300]
+- Join Inner, (new_id#259L = new_id#295L)
:- Project [new_id#259L, attr#257]
: +- Project [cast(attr#257.id as bigint) AS new_id#259L, attr#257.id AS id#260, attr#257]
: +- Project [struct(id, id#80, Borough, Borough#73, Zone, Zone#74, service_zone, service_zone#75) AS attr#257]
: +- Project [LocationID#72 AS id#80, Borough#73, Zone#74, service_zone#75]
: +- Relation[LocationID#72,Borough#73,Zone#74,service_zone#75] csv
+- Project [struct(distances, graphx_attr#296._1) AS graphx_attr#300, new_id#295L]
+- Project [_1#291L AS new_id#295L, _2#292 AS graphx_attr#296]
+- SerializeFromObject [knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#291L, if (isnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)) null else named_struct(_1, mapobjects(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, 1), if (isnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, 1))) null else named_struct(_1, knownnotnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, 1))._1, _2, knownnotnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, 1))._2), knownnotnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)._1, None)) AS _2#292]
+- ExternalRDD [obj#290]
== Analyzed Logical Plan ==
id: string, distances: string
GlobalLimit 21
+- LocalLimit 21
+- Project [cast(id#322 as string) AS id#411, cast(distances#376 as string) AS distances#412]
+- Project [id#322, distances#376]
+- Project [id#322, Borough#323, Zone#324, service_zone#325, UDF(distances#326) AS distances#376]
+- Project [attr#257.id AS id#322, attr#257.Borough AS Borough#323, attr#257.Zone AS Zone#324, attr#257.service_zone AS service_zone#325, graphx_attr#300.distances AS distances#326]
+- Project [attr#257, graphx_attr#300]
+- Project [new_id#259L, attr#257, graphx_attr#300]
+- Join Inner, (new_id#259L = new_id#295L)
:- Project [new_id#259L, attr#257]
: +- Project [cast(attr#257.id as bigint) AS new_id#259L, attr#257.id AS id#260, attr#257]
: +- Project [struct(id, id#80, Borough, Borough#73, Zone, Zone#74, service_zone, service_zone#75) AS attr#257]
: +- Project [LocationID#72 AS id#80, Borough#73, Zone#74, service_zone#75]
: +- Relation[LocationID#72,Borough#73,Zone#74,service_zone#75] csv
+- Project [struct(distances, graphx_attr#296._1) AS graphx_attr#300, new_id#295L]
+- Project [_1#291L AS new_id#295L, _2#292 AS graphx_attr#296]
+- SerializeFromObject [knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#291L, if (isnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)) null else named_struct(_1, mapobjects(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, 1), if (isnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, 1))) null else named_struct(_1, knownnotnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, 1))._1, _2, knownnotnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, 1))._2), knownnotnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)._1, None)) AS _2#292]
+- ExternalRDD [obj#290]
== Optimized Logical Plan ==
GlobalLimit 21
+- LocalLimit 21
+- Project [cast(attr#257.id as string) AS id#411, cast(UDF(graphx_attr#300.distances) as string) AS distances#412]
+- Join Inner, (new_id#259L = new_id#295L)
:- Project [cast(LocationID#72 as bigint) AS new_id#259L, struct(id, LocationID#72, Borough, Borough#73, Zone, Zone#74, service_zone, service_zone#75) AS attr#257]
: +- Filter isnotnull(cast(LocationID#72 as bigint))
: +- Relation[LocationID#72,Borough#73,Zone#74,service_zone#75] csv
+- Project [struct(distances, _2#292._1) AS graphx_attr#300, _1#291L AS new_id#295L]
+- SerializeFromObject [knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#291L, if (isnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)) null else named_struct(_1, mapobjects(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, -1), if (isnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, -1))) null else named_struct(_1, knownnotnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, -1))._1, _2, knownnotnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, -1))._2), knownnotnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)._1, None)) AS _2#292]
+- ExternalRDD [obj#290]
== Physical Plan ==
CollectLimit 21
+- *(2) Project [cast(attr#257.id as string) AS id#411, cast(UDF(graphx_attr#300.distances) as string) AS distances#412]
+- *(2) BroadcastHashJoin [new_id#259L], [new_id#295L], Inner, BuildLeft
:- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#423]
: +- *(1) Project [cast(LocationID#72 as bigint) AS new_id#259L, struct(id, LocationID#72, Borough, Borough#73, Zone, Zone#74, service_zone, service_zone#75) AS attr#257]
: +- *(1) Filter isnotnull(cast(LocationID#72 as bigint))
: +- FileScan csv [LocationID#72,Borough#73,Zone#74,service_zone#75] Batched: false, DataFilters: [isnotnull(cast(LocationID#72 as bigint))], Format: CSV, Location: InMemoryFileIndex[s3a://data-repository-bkt/ECS765/nyc_taxi/taxi_zone_lookup.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<LocationID:int,Borough:string,Zone:string,service_zone:string>
+- *(2) Project [struct(distances, _2#292._1) AS graphx_attr#300, _1#291L AS new_id#295L]
+- *(2) SerializeFromObject [knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#291L, if (isnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)) null else named_struct(_1, mapobjects(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, -1), if (isnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, -1))) null else named_struct(_1, knownnotnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, -1))._1, _2, knownnotnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, -1))._2), knownnotnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)._1, None)) AS _2#292]
+- Scan[obj#290]