== Parsed Logical Plan == GlobalLimit 11 +- LocalLimit 11 +- Project [cast(to1Id#353 as string) AS to1Id#365, cast(distanceShortest#346 as string) AS distanceShortest#366] +- Project [to1Id#353, distanceShortest#346] +- Project [id#286, Borough#287, Zone#288, service_zone#289, distances#340, distanceShortest#346, concat_ws(->, cast(id#286 as string), cast(1 as string)) AS to1Id#353] +- Project [id#286, Borough#287, Zone#288, service_zone#289, distances#340, distances#340[cast(1 as bigint)] AS distanceShortest#346] +- Project [id#286, Borough#287, Zone#288, service_zone#289, UDF(distances#290) AS distances#340] +- Project [attr#270.id AS id#286, attr#270.Borough AS Borough#287, attr#270.Zone AS Zone#288, attr#270.service_zone AS service_zone#289, graphx_attr#255.distances AS distances#290] +- Project [attr#270, graphx_attr#255] +- Project [new_id#272L, attr#270, graphx_attr#255] +- Join Inner, (new_id#272L = new_id#250L) :- Project [new_id#272L, attr#270] : +- Project [cast(attr#270.id as bigint) AS new_id#272L, attr#270.id AS id#273, attr#270] : +- Project [struct(id, id#120, Borough, Borough#73, Zone, Zone#74, service_zone, service_zone#75) AS attr#270] : +- Project [LocationID#72 AS id#120, Borough#73, Zone#74, service_zone#75] : +- Relation[LocationID#72,Borough#73,Zone#74,service_zone#75] csv +- Project [struct(distances, graphx_attr#251._1) AS graphx_attr#255, new_id#250L] +- Project [_1#246L AS new_id#250L, _2#247 AS graphx_attr#251] +- SerializeFromObject [knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#246L, if (isnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)) null else named_struct(_1, mapobjects(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, 11), if (isnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, 11))) null else named_struct(_1, knownnotnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, 11))._1, _2, knownnotnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, 11))._2), knownnotnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)._1, None)) AS _2#247] +- ExternalRDD [obj#245] == Analyzed Logical Plan == to1Id: string, distanceShortest: string GlobalLimit 11 +- LocalLimit 11 +- Project [cast(to1Id#353 as string) AS to1Id#365, cast(distanceShortest#346 as string) AS distanceShortest#366] +- Project [to1Id#353, distanceShortest#346] +- Project [id#286, Borough#287, Zone#288, service_zone#289, distances#340, distanceShortest#346, concat_ws(->, cast(id#286 as string), cast(1 as string)) AS to1Id#353] +- Project [id#286, Borough#287, Zone#288, service_zone#289, distances#340, distances#340[cast(1 as bigint)] AS distanceShortest#346] +- Project [id#286, Borough#287, Zone#288, service_zone#289, UDF(distances#290) AS distances#340] +- Project [attr#270.id AS id#286, attr#270.Borough AS Borough#287, attr#270.Zone AS Zone#288, attr#270.service_zone AS service_zone#289, graphx_attr#255.distances AS distances#290] +- Project [attr#270, graphx_attr#255] +- Project [new_id#272L, attr#270, graphx_attr#255] +- Join Inner, (new_id#272L = new_id#250L) :- Project [new_id#272L, attr#270] : +- Project [cast(attr#270.id as bigint) AS new_id#272L, attr#270.id AS id#273, attr#270] : +- Project [struct(id, id#120, Borough, Borough#73, Zone, Zone#74, service_zone, service_zone#75) AS attr#270] : +- Project [LocationID#72 AS id#120, Borough#73, Zone#74, service_zone#75] : +- Relation[LocationID#72,Borough#73,Zone#74,service_zone#75] csv +- Project [struct(distances, graphx_attr#251._1) AS graphx_attr#255, new_id#250L] +- Project [_1#246L AS new_id#250L, _2#247 AS graphx_attr#251] +- SerializeFromObject [knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#246L, if (isnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)) null else named_struct(_1, mapobjects(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, 11), if (isnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, 11))) null else named_struct(_1, knownnotnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, 11))._1, _2, knownnotnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, 11))._2), knownnotnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)._1, None)) AS _2#247] +- ExternalRDD [obj#245] == Optimized Logical Plan == GlobalLimit 11 +- LocalLimit 11 +- Project [concat_ws(->, cast(attr#270.id as string), 1) AS to1Id#365, cast(UDF(graphx_attr#255.distances)[1] as string) AS distanceShortest#366] +- Join Inner, (new_id#272L = new_id#250L) :- Project [cast(LocationID#72 as bigint) AS new_id#272L, struct(id, LocationID#72, Borough, Borough#73, Zone, Zone#74, service_zone, service_zone#75) AS attr#270] : +- Filter isnotnull(cast(LocationID#72 as bigint)) : +- Relation[LocationID#72,Borough#73,Zone#74,service_zone#75] csv +- Project [struct(distances, _2#247._1) AS graphx_attr#255, _1#246L AS new_id#250L] +- SerializeFromObject [knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#246L, if (isnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)) null else named_struct(_1, mapobjects(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, -1), if (isnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, -1))) null else named_struct(_1, knownnotnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, -1))._1, _2, knownnotnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, -1))._2), knownnotnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)._1, None)) AS _2#247] +- ExternalRDD [obj#245] == Physical Plan == CollectLimit 11 +- *(2) Project [concat_ws(->, cast(attr#270.id as string), 1) AS to1Id#365, cast(UDF(graphx_attr#255.distances)[1] as string) AS distanceShortest#366] +- *(2) BroadcastHashJoin [new_id#272L], [new_id#250L], Inner, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#238] : +- *(1) Project [cast(LocationID#72 as bigint) AS new_id#272L, struct(id, LocationID#72, Borough, Borough#73, Zone, Zone#74, service_zone, service_zone#75) AS attr#270] : +- *(1) Filter isnotnull(cast(LocationID#72 as bigint)) : +- FileScan csv [LocationID#72,Borough#73,Zone#74,service_zone#75] Batched: false, DataFilters: [isnotnull(cast(LocationID#72 as bigint))], Format: CSV, Location: InMemoryFileIndex[s3a://data-repository-bkt/ECS765/nyc_taxi/taxi_zone_lookup.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<LocationID:int,Borough:string,Zone:string,service_zone:string> +- *(2) Project [struct(distances, _2#247._1) AS graphx_attr#255, _1#246L AS new_id#250L] +- *(2) SerializeFromObject [knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#246L, if (isnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)) null else named_struct(_1, mapobjects(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, -1), if (isnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, -1))) null else named_struct(_1, knownnotnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, -1))._1, _2, knownnotnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, -1))._2), knownnotnull(knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2)._1, None)) AS _2#247] +- Scan[obj#245]