Posts: 41
Threads: 20
Joined: Apr 2023
Nov-13-2023, 07:05 PM
(This post was last modified: Nov-13-2023, 07:05 PM by SuchUmami.)
Let's say I have a list of values like this:
100
99
90
79
62
94
88
75
80
72
74
87
84
90
I would like to look for the peaks and troughs in the numbers. However, I want to ignore small movements because the data may not go down consecutively until it reaches the trough and I would like to remove the ‘noise’ from the data.
I want to ignore small changes in price that are 10% or lower than the original peak or trough. This 10% move could span over several rows, it's a cumulative move.
It’s easy to see from the data that the first trough comes at 62. That is because there is no movement that goes in the opposite direction that is over 10%
It is also easy to see the next peak because 94 is way higher than 10% and it starts moving back down after that.
However the tricky part is after 75 it goes upwards to 80 which I’d like to ignore because the next row finally reaches a trough of 72. This small upward movement isn't enough to invalidate the trend.
Then it goes to 74, 87, 84 & 90 which qualifies for a new peak because it is over 10% and doesn’t get invalidated by a 10% movement in the opposite direction.
So if I were a great programmer, I’d like to make a printout of the data like this:
First peak = 100
First trough = 62
Second peak = 94
Second trough = 72
Third peak = 90
It’s quite easy to do this as a human but I’m having a lot of trouble programming it in python. I would be extremely grateful if anyone could help me with this problem.
Posts: 6,778
Threads: 20
Joined: Feb 2020
Nov-13-2023, 09:54 PM
(This post was last modified: Nov-13-2023, 09:54 PM by deanhystad.)
I don't think it is easy to do as a human. Write down a procedure that you could give to somebody who is unfamiliar with the problem. It has to have enough detail that they could pick peaks and troughs without you having to provide an example or do any teaching. If you can do that, you can easily convert your procedure to a python program. If you cannot write down a procedure you are being held back by not understanding the problem.
For example, I don't know what I should do with this series.
50, 51, 50, 49, 50, 51, 50, 49
Is there a peak? a valley?
What about this series?
50, 51, 52, 51, 50, 49, 48, 47, 46, 45, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55?
Gribouillis likes this post
Posts: 4,780
Threads: 76
Joined: Jan 2018
Nov-14-2023, 06:48 AM
(This post was last modified: Nov-14-2023, 06:48 AM by Gribouillis.)
(Nov-13-2023, 07:05 PM)SuchUmami Wrote: I want to ignore small changes in price that are 10% or lower than the original peak or trough. This 10% move could span over several rows, it's a cumulative move. This rule needs more details. 10% of what exactly? What exactly is cumulative?
Also you could look if existing noise reduction filters for signal processing could address this problem.
Posts: 41
Threads: 20
Joined: Apr 2023
(Nov-13-2023, 09:54 PM)deanhystad Wrote: For example, I don't know what I should do with this series.
50, 51, 50, 49, 50, 51, 50, 49
Is there a peak? a valley?
If that was all there was, then I guess it should be considered neutral because there's not been a 10% change. However, if a number before that was 60, it would be part of one downtrend from 60 to 49.
(Nov-13-2023, 09:54 PM)deanhystad Wrote: What about this series?
50, 51, 52, 51, 50, 49, 48, 47, 46, 45, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55?
Again, I suppose it should be considered neutral because there isn't a 10% change anywhere. I didn't think about it because the actual dataset that I would be working with will always contain 10%+ changes.
Posts: 41
Threads: 20
Joined: Apr 2023
Here is what I've currently got:
import pandas as pd
def detect_trend_change(data, threshold=10):
trend_dataframes = []
trend = None
trend_sum = 0
trend_points = []
for i in range(1, len(data)):
change_percent = 100 * (data[i] - data[i-1]) / data[i-1]
current_trend = 'Uptrend' if change_percent > 0 else 'Downtrend'
if trend is None:
trend = current_trend
trend_points.append(data[i-1])
if current_trend == trend:
trend_sum += abs(change_percent)
trend_points.append(data[i])
else:
if abs(trend_sum) >= threshold:
df = pd.DataFrame({'Value': trend_points, 'Trend': trend})
trend_dataframes.append(df)
trend_points = [data[i-1], data[i]]
trend_sum = abs(change_percent)
trend = current_trend
else:
trend_sum += abs(change_percent)
trend_points.append(data[i])
# Add the last trend
if trend_points:
df = pd.DataFrame({'Value': trend_points, 'Trend': trend})
trend_dataframes.append(df)
return trend_dataframes
# Provided dataset
dataset = [100, 99, 90, 79, 62, 94, 88, 75, 80, 72, 74, 87, 84, 90]
# Call the function with the dataset
resulting_dataframes = detect_trend_change(dataset)
# Print the resulting dataframes
for idx, df in enumerate(resulting_dataframes, 1):
print(f"DataFrame {idx}:\n{df}\n") However, it's coming out as:
DataFrame 1:
Value Trend
0 100 Downtrend
1 99 Downtrend
2 90 Downtrend
3 79 Downtrend
4 62 Downtrend
DataFrame 2:
Value Trend
0 62 Uptrend
1 94 Uptrend
DataFrame 3:
Value Trend
0 94 Downtrend
1 88 Downtrend
2 75 Downtrend
DataFrame 4:
Value Trend
0 75 Uptrend
1 80 Uptrend
2 72 Uptrend
3 74 Uptrend
4 87 Uptrend
DataFrame 5:
Value Trend
0 87 Downtrend
1 84 Downtrend
2 90 Downtrend
The errors start at dataframe 3, where it should have gone down to 72 because there was no 10% upmove to invalidate it. Then it should have went from 72 to 90 as an uptrend.
Posts: 6,778
Threads: 20
Joined: Feb 2020
Nov-20-2023, 04:47 AM
(This post was last modified: Nov-21-2023, 04:35 AM by deanhystad.)
This might do what you want. It splits the data into a bunch of localized peak valleys using your 10% change threshold. The localized peak valleys for [100, 99, 90, 79, 62, 94, 88, 75, 80, 72, 74, 87, 84, 90] are [100, 99, 90], [90, 79], [79, 62], [62, 94], [94, 88, 75], [75, 80, 72, 74, 87], [87, 84, 90]. This is done by the Trend class, a list with a special append() method.
The Trends class creates a list of trends. When a Trend is appended to the list, Trends combines the new trend with the previous trend. If both trends trend in the same direction, the new trend is absorbed into the previous trend. If the new trend changes direction, Trends searches for the peak or valley (depending on direction) in the new trend. This is the start of the new trend, and points prior to this are moved to the previous trend.
class Trend(list):
"""A list of numbers that trend in the same direction."""
def __init__(self, value=None):
super().__init__()
self.direction = 0
super().append(value)
def append(self, value):
"""Add number to Trend. Return direction of trend (-1, 0, 1)."""
super().append(value)
if value <= self[0] * 0.9 or value >= self[0] * 1.1:
self.direction = 1 if self[-1] > self[0] else -1
return self.direction
class Trends(list):
"""Convert list of numbers to a list of trends."""
def __init__(self, values):
super().__init__()
trend = Trend(values[0])
for value in values[1:]:
if trend.append(value) != 0:
self._add_trend(trend)
trend = Trend(value)
self._add_trend(trend)
def _add_trend(self, trend):
"""Combine last trend into list"""
prev = self[-1] if self else None
if prev:
i = trend.index(max(trend) if prev.direction == 1 else min(trend))
prev.extend(trend[1:i+1])
del trend[:i]
if len(trend) > 1:
super().append(trend)
print(Trends([100, 99, 90, 79, 62, 94, 88, 75, 80, 72, 74, 87, 84, 90])) And as a generator without using classes:
def trends(numbers):
prev = None
trend = []
prev_dir = trend_dir = 0
for number in numbers:
trend.append(number)
if number > trend[0] * 1.1 or number < trend[0] * 0.9:
trend_dir = 1 if number > trend[0] else -1
if prev:
if trend_dir in (0, prev_dir):
prev.extend(trend)
else:
elbow = min(trend) if trend_dir > 0 else max(trend)
index = trend.index(elbow)
prev.extend(trend[1:index+1])
yield prev
prev = trend[index:]
prev_dir = trend_dir
else:
prev = trend
prev_dir = trend_dir
trend = [number]
if len(trend) > 1:
if prev:
prev.extend(trend[1:])
yield prev
else:
yield trend
data = [100, 99, 90, 79, 62, 94, 88, 75, 80, 72, 74, 87, 84, 90]
print(*trends(data))
Posts: 11
Threads: 0
Joined: Nov 2023
Nov-20-2023, 10:01 AM
(This post was last modified: Nov-20-2023, 12:12 PM by Gribouillis.)
def find_peaks_troughs(values):
if not values:
return "No values provided"
# Initialize the first peak and trough
peak = trough = values[0]
peaks = []
troughs = []
is_peak = False
for value in values:
if is_peak:
# Check if current value is a trough
if value / peak <= 0.9:
troughs.append(peak)
trough = value
is_peak = False
else:
# Check if current value is a peak
if value / trough >= 1.1:
peaks.append(trough)
peak = value
is_peak = True
# Add the last peak or trough found
if is_peak:
troughs.append(peak)
else:
peaks.append(trough)
return peaks, troughs
# List of values
values = [100, 99, 90, 79, 62, 94, 88, 75, 80, 72, 74, 87, 84, 90]
# Find peaks and troughs
peaks, troughs = find_peaks_troughs(values)
# Print results
for i in range(min(len(peaks), len(troughs))):
print(f"Peak {i + 1} = {troughs[i]}")
print(f"Trough {i + 1} = {peaks[i]}") This code will process your list of values and print out the peaks and troughs as per your requirement. It treats the initial value as a potential trough and toggles between looking for peaks and troughs as it finds significant movements in the values.
|