Python Forum
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
asyncio interval
#1
Hiya,

I have been working on a wildlife tracking project which reads in a stream of data from a Software Defined Radio (SDR - AirspyHF) and need to change a frequency every x seconds on the attached SDR (Its a kinda neat project, volunteer, non profit).

The current working main branch is here : https://github.com/bigalnz/test_fft/tree/main

The question is, how and where to put the code to change from frequency 1 to frequency 2 after x seconds, then change back again when x seconds is up.

For example if the program is started with :

--center 160425000 --alternate-freq 160500100 --freq-change-interval 80

Then it starts with freq 160425000 Hz and waits 80 seconds before changing to 160500100 and then after another 80 seconds swaps back.

The current code if I understand it correctly has a sample_config instance of the SampleConfig class inside sample_reader module : https://github.com/bigalnz/test_fft/blob...sdr.py#L28 and I think all I need to do is change sample_config.center_freq which would then update the SDR?

What would this timer code look like? Where should it go?

Sorry for all the basic questions but the existing code was not my work. If further clarification is required please let me know.
Reply
#2
Here's an async timer class you can use.
I set the time to 2 seconds, you can change to 60
call your frequency change routine from the start_event method.

import asyncio


class Timer:
    def __init__(self):
        self.interval_seconds = 2         # Delay in seconds
        self.nornal_freq = 160425000
        self.alternate_freq = 160500100
        self.altfreq_active = False
    
    async def timer(self, interval, callback):
        while True:
            await asyncio.sleep(interval)
            await self.start_event()

    async def start_event(self):
        if self.altfreq_active:
            print(f"Setting frequency to {self.nornal_freq}")
        else:
            print(f"Setting frequency to {self.alternate_freq}")
        self.altfreq_active = not self.altfreq_active

    async def main(self):
        await self.timer(self.interval_seconds, self.start_event()) # Trigger callback every 2 seconds

if __name__ == "__main__":
    tt = Timer()
    asyncio.run(tt.main())
Reply
#3
Thank you for the pseudo code. I have made a start at implementing this in my code, but a few questions remain in the context of my project.

Firstly I should have mentioned that alternate_freq and freq_change_interval are optional arguments. So I have defaulted freq_change_interval to 0 and test for this at the start.

common.py
@dataclass
class SampleConfig:
    sample_rate: float = 1.024e6
    """Sample rate"""

    center_freq: float = 160_425_000
    """Center frequency"""

    alternate_freq: float = 160_725_000
    """Center frequency to jump to when scan_interval given"""

    alternate_freq_active: bool = False
    """Boolean to track alt freq status"""

    freq_change_interval: float = 0
    """ Interval to change center freq in seconds"""

    read_size: int = 65536
    """Number of samples to read from the sdr in each iteration"""

    gain: str | float = 7.7
    """gain in dB"""

    bias_tee_enable: bool = False
    """Enable bias tee"""

    location: str = "anywhere"
    """Set population location"""

    scan_interval: int | None = None
    """Scan interval in minutes"""
Now in sample_reader.py can I simplify to this:

    if args.freq_change_interval > 0:
        async def timer(sample_config, callback):
            while True:
                await asyncio.sleep(sample_config.freq_change_interval)
                await self.start_event()
        
        async def start_event(sample_config):
            sample_config.alternate_freq, sample_config.center_freq = sample_config.center_freq, sample_config.alternate_freq # Do the swap
But this isnt quite right, as I no longer have a self, and while the module runs, the swap never happens.
Reply
#4
Put the timer class into it's own file, named (whatever uou want, but it's easier if class and module name are the same) Timer.py
Make modifications to the timer class as needed.
import into your main module with from Timer import Timer
instantiate in your module (at start) with
mytimer = Timer()
call methods with (for example) mytimer.timer(60, mycallback)
Reply
#5
Ok, I have created a timer.py with

import asyncio
from kiwitracker.common import SampleConfig
  
class Timer:

    sample_config: SampleConfig

    async def timer(sample_config, callback):
        while True:
            await asyncio.sleep(sample_config.freq_change_interval)
            await start_event()
 
    async def start_event(sample_config):
        sample_config.alternate_freq, sample_config.center_freq = sample_config.center_freq, sample_config.alternate_freq # Do the swap

    async def main(self):
        await self.timer(self.interval_seconds, self.start_event()) # Trigger callback every 2 seconds
But I still have the issue that I don't have a self, because I import sample_config which contains the frequencies and interval.
Reply
#6
you need an initialization routine in your sample config class.
This is where the timer class should be instantiated.
You could also create a third module which would act as a dispatcher.

It's late for me here (EDT time) I get up at 3 or 4 A.M. and go to bed by 10:00 P.M. (getting to be an old man, near 80), so I'll attempt to work this out tomorrow morning, and get back then.
Reply
#7
Ok - thank you.

If it helps, the SampleConfig class is defined in module common.py and the relevant bit is :

@dataclass
class SampleConfig:
    sample_rate: float = 1.024e6
    """Sample rate"""

    center_freq: float = 160_425_000
    """Center frequency"""

    alternate_freq: float = 160_725_000
    """Center frequency to jump to when scan_interval given"""

    alternate_freq_active: bool = False
    """Boolean to track alt freq status"""

    freq_change_interval: float = 0
    """ Interval to change center freq in seconds"""

    read_size: int = 65536
    """Number of samples to read from the sdr in each iteration"""

    gain: str | float = 7.7
    """gain in dB"""

    bias_tee_enable: bool = False
    """Enable bias tee"""

    location: str = "Ponui"
    """Set population location"""

    scan_interval: int | None = None
    """Scan interval in minutes"""
But I also do not understand the line:

mytimer.timer(60, mycallback) 
What is mycallback?
Reply
#8
bigal Wrote:But I also do not understand the line:

1

mytimer.timer(60, mycallback)
What is mycallback?
That was just an example.
You probably want the callback to be in the class that calls the timer, and name it whatever fits your code.
The callback in the example is just that, an example of a simple callback. you write one that fits your project.
It will be executed by an event (interrupt if you will) each time the timer reaches zero.
which module in the fft will be initiating the timer?
this is where the callback should go.

+++++++++++++++++++++++++++ Added at 10:03 EDT +++++++++++++++++++++++++++
I just looked at the application you referenced in GitHub (Of course I should have done this first)
It seems to me that the application has everything that you need, including the timer(s).

There's much more here than I want to (or have time to) get involved with, but a quick look using
a simple grep on the source grep -ri -m1 --include '*.py' --exclude-dir=venv asyncio.sleep .
shows the following (location where timer events occur):

Output:
./kiwitracker/sample_reader_rtlsdr.py: await asyncio.sleep(0.1) ./kiwitracker/sample_reader.py: await asyncio.sleep(0) ./kiwitracker/sample_reader_airspy.py: await asyncio.sleep(0) ./kiwitracker/sample_reader_dummy.py: await asyncio.sleep(0)
You can use the 'grep' command, modified to find other parts of the package.
Reply
#9
(Mar-27-2025, 01:24 AM)Larz60+ Wrote: (getting to be an old man, near 80),
Still going 💪


Can try something like this bigal.
import asyncio
from common import SampleConfig

async def swap_frequencies(config: SampleConfig):
    # Swap the center and alternate frequencies.
    config.alternate_freq, config.center_freq = config.center_freq, config.alternate_freq
    print(f"Swapped frequencies. New center_freq: {config.center_freq}, alternate_freq: {config.alternate_freq}")

async def frequency_timer(config: SampleConfig):
    # Run the swap at the interval specified in the config.
    while True:
        await asyncio.sleep(config.freq_change_interval)
        await swap_frequencies(config)

async def main(config: SampleConfig):    
    if config.freq_change_interval > 0:
        asyncio.create_task(frequency_timer(config))        
    while True:        
        await asyncio.sleep(1) 
        
if __name__ == '__main__':
    # Example: swap frequencies every 10 seconds.
    config = SampleConfig(freq_change_interval=10)
    asyncio.run(main(config)) 
If write test for it.
# pip install pytest-asyncio
import asyncio
import pytest
from sample_reader import swap_frequencies, frequency_timer
from common import SampleConfig

@pytest.mark.asyncio
async def test_swap_frequencies():  
    config = SampleConfig(center_freq=100.0, alternate_freq=200.0)
    await swap_frequencies(config)
    # After swapping, center should now be 200 and alternate should be 100.
    assert config.center_freq == 200.0
    assert config.alternate_freq == 100.0

@pytest.mark.asyncio
async def test_frequency_timer_once():
    # Set a short interval to test a single swap.
    config = SampleConfig(center_freq=100.0, alternate_freq=200.0, freq_change_interval=0.1)   
    task = asyncio.create_task(frequency_timer(config))
    # Wait a little longer than the interval to ensure at least one swap occurs.
    await asyncio.sleep(0.15)    
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        pass
    # Verify that frequencies have been swapped.
    assert config.center_freq == 200.0
    assert config.alternate_freq == 100.0 
Output:
λ pytest -v -s ..... tests/test_sample_reader.py::test_swap_frequencies Swapped frequencies. New center_freq: 200.0, alternate_freq: 100.0 PASSED tests/test_sample_reader.py::test_frequency_timer_once Swapped frequencies. New center_freq: 200.0, alternate_freq: 100.0 PASSED
Larz60+ likes this post
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  Mapping a value to an interval JazonKuer 12 4,950 Mar-17-2023, 07:59 PM
Last Post: Gribouillis
  Confidence interval after doing penalised cox regression HatemAli 0 1,632 Feb-23-2022, 11:02 PM
Last Post: HatemAli
  Complex X Tick Interval JoeDainton123 0 2,005 Oct-05-2020, 07:27 PM
Last Post: JoeDainton123
  Updating a matrix in a time interval inside a for loop vp1989 4 4,179 May-17-2020, 07:15 PM
Last Post: vp1989
  Assigning Data from one column to another with different associated timing interval alexafshari 1 2,580 Apr-30-2020, 03:59 PM
Last Post: pyzyx3qwerty
  adddate interval in python Steve42 2 3,063 Apr-16-2020, 08:24 PM
Last Post: Larz60+
  save data in .txt after certain interval Shaswat 1 2,860 Oct-13-2019, 07:07 AM
Last Post: Gribouillis
  How define iteration interval increment SriMekala 5 6,003 Jun-01-2019, 01:06 PM
Last Post: ichabod801
  Python data prep help for confidence interval eslearner 0 2,918 Feb-24-2018, 05:13 AM
Last Post: eslearner

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020