digraph G {
0 [labelType="html" label="<br><b>CollectLimit</b><br><br>"];
subgraph cluster1 {
isCluster="true";
label="WholeStageCodegen (2)\n \nduration: total (min, med, max (stageId: taskId))\n63 ms (0 ms, 0 ms, 9 ms (stage 69.0: task 1978))";
2 [labelType="html" label="<b>HashAggregate</b><br><br>time in aggregation build total (min, med, max (stageId: taskId))<br>27 ms (0 ms, 0 ms, 7 ms (stage 69.0: task 1978))<br>peak memory total (min, med, max (stageId: taskId))<br>791.5 MiB (256.0 KiB, 256.0 KiB, 64.3 MiB (stage 69.0: task 1978))<br>number of output rows: 12<br>avg hash probe bucket list iters (min, med, max (stageId: taskId)):<br>(1, 1, 1 (stage 69.0: task 1978))"];
}
3 [labelType="html" label="<b>Exchange</b><br><br>shuffle records written: 786<br>shuffle write time total (min, med, max (stageId: taskId))<br>228 ms (5 ms, 7 ms, 12 ms (stage 64.0: task 1953))<br>records read: 336<br>local bytes read total (min, med, max (stageId: taskId))<br>10.7 KiB (0.0 B, 0.0 B, 1024.0 B (stage 69.0: task 1978))<br>fetch wait time total (min, med, max (stageId: taskId))<br>10 ms (0 ms, 0 ms, 5 ms (stage 69.0: task 1978))<br>remote bytes read total (min, med, max (stageId: taskId))<br>10.2 KiB (0.0 B, 0.0 B, 1024.0 B (stage 69.0: task 1979))<br>local blocks read: 172<br>remote blocks read: 164<br>data size total (min, med, max (stageId: taskId))<br>18.4 KiB (672.0 B, 672.0 B, 696.0 B (stage 64.0: task 1952))<br>shuffle bytes written total (min, med, max (stageId: taskId))<br>49.1 KiB (1791.0 B, 1792.0 B, 1856.0 B (stage 64.0: task 1952))"];
subgraph cluster4 {
isCluster="true";
label="WholeStageCodegen (1)\n \nduration: total (min, med, max (stageId: taskId))\n2.4 m (4.2 s, 5.0 s, 6.5 s (stage 64.0: task 1945))";
5 [labelType="html" label="<b>HashAggregate</b><br><br>time in aggregation build total (min, med, max (stageId: taskId))<br>2.4 m (4.2 s, 5.0 s, 6.5 s (stage 64.0: task 1950))<br>peak memory total (min, med, max (stageId: taskId))<br>7.0 MiB (256.0 KiB, 256.0 KiB, 256.0 KiB (stage 64.0: task 1946))<br>number of output rows: 786"];
6 [labelType="html" label="<br><b>Project</b><br><br>"];
7 [labelType="html" label="<b>Scan ExistingRDD</b><br><br>number of output rows: 33,832,162"];
}
2->0;
3->2;
5->3;
6->5;
7->6;
}
8
CollectLimit 11
HashAggregate(keys=[year#127], functions=[count(1)])
WholeStageCodegen (2)
Exchange hashpartitioning(year#127, 200), true, [id=#381]
HashAggregate(keys=[year#127], functions=[partial_count(1)])
Project [year(cast(from_unixtime(timestamp_str#3L, yyyy-MM-dd, Some(GMT)) as date)) AS year#127]
Scan ExistingRDD[userId#0L,movieId#1L,rating#2,timestamp_str#3L]
WholeStageCodegen (1)
== Parsed Logical Plan ==
GlobalLimit 11
+- LocalLimit 11
+- Project [cast(year#127 as string) AS year#320, cast(ratings_per_year#315L as string) AS ratings_per_year#321]
+- Aggregate [year#127], [year#127, count(1) AS ratings_per_year#315L]
+- Project [userId#0L, movieId#1L, rating#2, timestamp_str#3L, date#39, month#120, year#127, CASE WHEN ((month#120 >= 1) AND (month#120 <= 6)) THEN Early Year ELSE Late Year END AS time_of_rating#135]
+- Project [userId#0L, movieId#1L, rating#2, timestamp_str#3L, date#39, month#120, year(cast(date#39 as date)) AS year#127]
+- Project [userId#0L, movieId#1L, rating#2, timestamp_str#3L, date#39, month(cast(date#39 as date)) AS month#120]
+- Sort [date#39 ASC NULLS FIRST], true
+- Project [userId#0L, movieId#1L, rating#2, timestamp_str#3L, from_unixtime(timestamp_str#3L, yyyy-MM-dd, Some(GMT)) AS date#39]
+- LogicalRDD [userId#0L, movieId#1L, rating#2, timestamp_str#3L], false
== Analyzed Logical Plan ==
year: string, ratings_per_year: string
GlobalLimit 11
+- LocalLimit 11
+- Project [cast(year#127 as string) AS year#320, cast(ratings_per_year#315L as string) AS ratings_per_year#321]
+- Aggregate [year#127], [year#127, count(1) AS ratings_per_year#315L]
+- Project [userId#0L, movieId#1L, rating#2, timestamp_str#3L, date#39, month#120, year#127, CASE WHEN ((month#120 >= 1) AND (month#120 <= 6)) THEN Early Year ELSE Late Year END AS time_of_rating#135]
+- Project [userId#0L, movieId#1L, rating#2, timestamp_str#3L, date#39, month#120, year(cast(date#39 as date)) AS year#127]
+- Project [userId#0L, movieId#1L, rating#2, timestamp_str#3L, date#39, month(cast(date#39 as date)) AS month#120]
+- Sort [date#39 ASC NULLS FIRST], true
+- Project [userId#0L, movieId#1L, rating#2, timestamp_str#3L, from_unixtime(timestamp_str#3L, yyyy-MM-dd, Some(GMT)) AS date#39]
+- LogicalRDD [userId#0L, movieId#1L, rating#2, timestamp_str#3L], false
== Optimized Logical Plan ==
GlobalLimit 11
+- LocalLimit 11
+- Aggregate [year#127], [cast(year#127 as string) AS year#320, cast(count(1) as string) AS ratings_per_year#321]
+- Project [year(cast(from_unixtime(timestamp_str#3L, yyyy-MM-dd, Some(GMT)) as date)) AS year#127]
+- LogicalRDD [userId#0L, movieId#1L, rating#2, timestamp_str#3L], false
== Physical Plan ==
CollectLimit 11
+- *(2) HashAggregate(keys=[year#127], functions=[count(1)], output=[year#320, ratings_per_year#321])
+- Exchange hashpartitioning(year#127, 200), true, [id=#381]
+- *(1) HashAggregate(keys=[year#127], functions=[partial_count(1)], output=[year#127, count#325L])
+- *(1) Project [year(cast(from_unixtime(timestamp_str#3L, yyyy-MM-dd, Some(GMT)) as date)) AS year#127]
+- *(1) Scan ExistingRDD[userId#0L,movieId#1L,rating#2,timestamp_str#3L]