 How to calculate Centered Moving Average of a set of data in Hadoop Map-Reduce?

I want to calculate Centered Moving average of a set of Data .

Example Input format :

quarter | sales
Q1'11   | 9
Q2'11   | 8
Q3'11   | 9
Q4'11   | 12
Q1'12   | 9
Q2'12   | 12
Q3'12   | 9
Q4'12   | 10

Mathematical Representation of data and calculation of Moving average and then centered moving average

Period	 Value	 MA	 Centered
1	       9
1.5
2	       8
2.5	  	         9.5
3	       9            9.5
3.5	  	         9.5
4	       12	        10.0
4.5	  	         10.5
5	       9	  	    10.750
5.5	  	         11.0
6	       12
6.5
7	       9

I am stuck with the implementation of RecordReader which will provide mapper sales value of a year i.e. of four quarter.

This is actually totally doable in the MapReduce paradigm; it does not have to be though of as a 'sliding window'. Instead think of the fact that each data point is relevant to a max of four MA calculations, and remember that each call to the map function can emit more than one key-value pair. Here is pseudo-code:

First MR job:

map(quarter, sales)
emit(quarter - 1.5, sales)
emit(quarter - 0.5, sales)
emit(quarter + 0.5, sales)
emit(quarter + 1.5, sales)

reduce(quarter, list_of_sales)
if (list_of_sales.length == 4):
emit(quarter, average(list_of_sales))
endif

Second MR job:

map(quarter, MA)
emit(quarter - 0.5, MA)
emit(quarter + 0.5, MA)

reduce(quarter, list_of_MA)
if (list_of_MA.length == 2):
emit(quarter, average(list_of_sales))
endif

In best of my understanding moving average is not nicely maps to MapReduce paradigm since its calculation is essentially "sliding window" over sorted data, while MR is processing of non-intersected ranges of sorted data.
Solution I do see is as following:
a) To implement custom partitioner to be able to make two different partitions in two runs. In each run your reducers will get different ranges of data and calculate moving average where approprieate
I will try to illustrate:
In first run data for reducers should be:
R1: Q1, Q2, Q3, Q4
R2: Q5, Q6, Q7, Q8
...

here you will cacluate moving average for some Qs.

In next run your reducers should get data like: R1: Q1...Q6
R2: Q6...Q10
R3: Q10..Q14

And caclulate the rest of moving averages.
Then you will need to aggregate results.

Idea of custom partitioner that it will have two modes of operation - each time dividing into equal ranges but with some shift. In a pseudocode it will look like this :
partition = (key+SHIFT) / (MAX_KEY/numOfPartitions) ;
where: SHIFT will be taken from the configuration.
MAX_KEY = maximum value of the key. I assume for simplicity that they start with zero.

RecordReader, IMHO is not a solution since it is limited to specific split and can not slide over split's boundary.

Another solution would be to implement custom logic of splitting input data (it is part of the InputFormat). It can be done to do 2 different slides, similar to partitioning.