Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 77 additions & 8 deletions shadowsocks_manager/shadowsocks/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,17 +405,34 @@ def change_ips_softly(cls):
Updating DNS record on the fly is depends on the NameServer API which you have to set first in the domain app.
This is a feature of [aws-cfn-vpn](https://github.com/alexzhangs/aws-cfn-vpn).
"""
for node in cls.objects.filter(
for pk in cls.objects.filter(
is_active=True,
sns_endpoint__isnull=False,
sns_access_key__isnull=False,
sns_secret_key__isnull=False,
):
node.toggle_active() # make the node inactive
time.sleep(300) # assuming your DNS record TTL is 300 seconds
node.change_ip()
time.sleep(60) # AWS config capture takes time
node.toggle_active() # make the node active again
).values_list('pk', flat=True):
# Re-load each iteration so concurrent admin edits (e.g. cleared
# credentials, manual is_active toggle) aren't silently overwritten
# by stale in-memory state held across the 6-minute sleep.
node = cls.objects.get(pk=pk)
if not (node.is_active and node.sns_access_key and node.sns_secret_key):
continue
node.is_active = False
node.save()
try:
time.sleep(300) # wait for DNS TTL
node.refresh_from_db()
if not (node.sns_access_key and node.sns_secret_key):
continue
node.change_ip()
time.sleep(60) # AWS config capture
finally:
# Restore is_active regardless of what raised mid-cycle —
# otherwise an exception leaves the node stuck inactive and
# the next run sees it as ineligible.
node.refresh_from_db()
node.is_active = True
node.save()


class NodeAccount(StatisticMethod):
Expand Down Expand Up @@ -1016,8 +1033,60 @@ def update_by_account(sender, instance, **kwargs):
instance.on_update()


def _slack_webhook_url():
"""Return the Slack incoming-webhook URL, or '' if not configured.

Prefers settings.SSM_SLACK_WEBHOOK_URL (from the SSM_SLACK_WEBHOOK_URL env
var); falls back to the file $SSM_DATA_HOME/.slack-webhook so a URL can be
dropped onto a running container without a restart.
"""
from django.conf import settings
import os
url = (getattr(settings, 'SSM_SLACK_WEBHOOK_URL', '') or '').strip()
if url:
return url
try:
path = os.path.join(os.getenv('SSM_DATA_HOME', '/var/local/ssm'), '.slack-webhook')
with open(path) as f:
return f.read().strip()
except Exception:
return ''


def _notify_ip_change(node, old_ip, new_ip):
"""Best-effort Slack notification when a node's public IP changes (rotation).

Fires for any public_ip change, scheduled or not, so an unexpected rotation
is surfaced immediately. Never raises: a notification failure must not break
Node.save().
"""
url = _slack_webhook_url()
if not url:
return
try:
import json
import urllib.request
rec = getattr(node, 'record', None)
fqdn = ' (%s)' % rec.fqdn if rec is not None and getattr(rec, 'fqdn', None) else ''
text = ':rotating_light: VPN node *%s* IP changed: `%s` -> `%s`%s' % (
node.name, old_ip, new_ip, fqdn)
req = urllib.request.Request(
url, data=json.dumps({'text': text}).encode('utf-8'),
headers={'Content-Type': 'application/json'}, method='POST')
urllib.request.urlopen(req, timeout=5).read()
except Exception as e:
logger.warning('slack ip-change notify failed for %s: %s', node.name, e)


@receiver(post_save, sender=Node)
def update_by_node(sender, instance, **kwargs):
def update_by_node(sender, instance, created=False, **kwargs):
# Notify on a real public-IP change (rotation). The post-save snapshot in
# Node.save() runs AFTER this receiver, so _original_public_ip still holds
# the prior IP here and can be compared against the just-saved value.
old_ip = getattr(instance, '_original_public_ip', None)
new_ip = instance.public_ip
if not created and old_ip and new_ip and old_ip != new_ip:
_notify_ip_change(instance, old_ip, new_ip)
instance.on_update()


Expand Down
12 changes: 11 additions & 1 deletion shadowsocks_manager/shadowsocks/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@ def port_heartbeat():
def node_change_ips():
return Node.change_ips()

@shared_task
# IMPORTANT: this task must NOT use acks_late / reject_on_worker_lost.
# change_ips_softly blocks the worker for ~6 min per node (DNS TTL + Config
# wait). With late acks the message stays unacked for the whole run; the
# blocking sleep also lapses the AMQP heartbeat (see CELERY_BROKER_HEARTBEAT
# in settings), so the broker drops the connection and REDELIVERS the still-
# unacked message — turning one scheduled fire into an endless ~6-min rotation
# loop (the 2026 IP-rotation incident; observed redelivered=N/N in the queue).
# Early ack (the default) means the message is acked at delivery: a worker
# dying mid-run loses only that single run — recovered by the next schedule and
# the is_active finally-guard in change_ips_softly — instead of looping forever.
@shared_task(acks_late=False)
def node_change_ips_softly():
return Node.change_ips_softly()
143 changes: 142 additions & 1 deletion shadowsocks_manager/shadowsocks/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
import time
import botocore
from abc import abstractmethod
from django.test import TestCase
from unittest import mock
from django.test import TestCase, override_settings
from django.core.exceptions import ValidationError
from django.core.management import call_command

Expand Down Expand Up @@ -816,3 +817,143 @@ def test_node_no_change_no_cascade(self):
na.refresh_from_db()
self.assertTrue(na.is_active,
'a save that does not flip is_active should leave NodeAccount untouched')


class NodeChangeIpsSoftlyTestCase(AppTestCase):
"""Resilience tests for Node.change_ips_softly().

The soft-rotation loop inactivates a node, waits for the DNS TTL,
triggers the IP change, then reactivates it. The hardening guards two
failure modes seen in the IP-rotation incident:

1. If change_ip() (or anything in the cycle) raises, a finally block
must restore is_active=True so the node isn't left permanently
inactive — which also drops it from the next run's eligibility
filter, compounding the outage.
2. Each node is re-loaded and re-checked inside the loop, so a node
whose SNS credentials are blank is skipped instead of having a
rotation triggered against stale state.
"""

def _make_node(self, name='soft-rotate-node', **kwargs):
defaults = dict(
public_ip='192.0.2.10', # TEST-NET-1, RFC 5737
private_ip='10.255.255.254',
location='Unit Test',
sns_endpoint='arn:aws:sns:ap-northeast-1:0:topic',
sns_access_key='mock',
sns_secret_key='mock',
is_active=True,
)
defaults.update(kwargs)
return models.Node.objects.create(name=name, **defaults)

def test_restores_is_active_when_change_ip_raises(self):
"""change_ip() failing mid-cycle must leave the node ACTIVE.

This is the core regression: pre-fix, an exception after the node
was inactivated left is_active=False permanently.
"""
node = self._make_node()
with mock.patch('shadowsocks.models.time.sleep'), \
mock.patch.object(models.Node, 'change_ip',
side_effect=RuntimeError('boom')) as m_change_ip:
# The exception propagates out of the loop, but the finally block
# must already have restored is_active before it does.
with self.assertRaises(RuntimeError):
models.Node.change_ips_softly()
m_change_ip.assert_called_once()
node.refresh_from_db()
self.assertTrue(node.is_active,
'a change_ip() failure must not leave the node stuck inactive')

def test_happy_path_keeps_node_active(self):
"""On success the node is rotated once and left active."""
node = self._make_node()
with mock.patch('shadowsocks.models.time.sleep'), \
mock.patch.object(models.Node, 'change_ip') as m_change_ip:
models.Node.change_ips_softly()
m_change_ip.assert_called_once()
node.refresh_from_db()
self.assertTrue(node.is_active)

def test_skips_node_with_blank_credentials(self):
"""A node whose SNS credentials are blank is selected by the
not-null filter but skipped by the in-loop guard — change_ip is
never called and the node is left untouched (active)."""
node = self._make_node(name='blank-creds',
sns_access_key='', sns_secret_key='')
with mock.patch('shadowsocks.models.time.sleep'), \
mock.patch.object(models.Node, 'change_ip') as m_change_ip:
models.Node.change_ips_softly()
m_change_ip.assert_not_called()
node.refresh_from_db()
self.assertTrue(node.is_active)


class NodeIpChangeSlackNotifyTestCase(AppTestCase):
"""A Node.public_ip change emits a best-effort Slack notification."""

def _node(self, name='slack-node', **kwargs):
defaults = dict(public_ip='192.0.2.10', private_ip='10.255.255.254',
location='Unit Test', is_active=True)
defaults.update(kwargs)
return models.Node.objects.create(name=name, **defaults)

@override_settings(SSM_SLACK_WEBHOOK_URL='https://hooks.slack.test/abc')
def test_notifies_on_public_ip_change(self):
node = self._node()
with mock.patch('urllib.request.urlopen') as m_open:
node.public_ip = '192.0.2.20'
node.save()
self.assertEqual(m_open.call_count, 1)
body = m_open.call_args[0][0].data.decode('utf-8')
self.assertIn('192.0.2.10', body) # old IP present
self.assertIn('192.0.2.20', body) # new IP present

@override_settings(SSM_SLACK_WEBHOOK_URL='https://hooks.slack.test/abc')
def test_no_notify_when_ip_unchanged(self):
node = self._node()
with mock.patch('urllib.request.urlopen') as m_open:
node.location = 'somewhere else' # save without touching public_ip
node.save()
m_open.assert_not_called()

def test_no_notify_when_webhook_unconfigured(self):
node = self._node()
with mock.patch('shadowsocks.models._slack_webhook_url', return_value=''), \
mock.patch('urllib.request.urlopen') as m_open:
node.public_ip = '192.0.2.30'
node.save()
m_open.assert_not_called()

@override_settings(SSM_SLACK_WEBHOOK_URL='https://hooks.slack.test/abc')
def test_notify_failure_does_not_break_save(self):
node = self._node()
with mock.patch('urllib.request.urlopen', side_effect=OSError('network down')):
node.public_ip = '192.0.2.40'
node.save() # must not raise
node.refresh_from_db()
self.assertEqual(node.public_ip, '192.0.2.40')


class CeleryRotationTaskConfigTestCase(TestCase):
"""Guard the broker/task settings that prevent the redelivery loop.

change_ips_softly blocks the worker ~6 min; with late acks or a live AMQP
heartbeat the broker drops the connection mid-run and redelivers the message,
looping forever (the 2026 IP-rotation incident).
"""

def test_change_ips_softly_uses_early_ack(self):
from shadowsocks.tasks import node_change_ips_softly
self.assertFalse(node_change_ips_softly.acks_late,
'node_change_ips_softly must early-ack: late-ack on a ~6 min blocking '
'task causes infinite broker redelivery')

def test_broker_heartbeat_disabled_and_single_prefetch(self):
from django.conf import settings
self.assertEqual(settings.CELERY_BROKER_HEARTBEAT, 0,
'a non-zero heartbeat lapses during the long blocking task and triggers '
'connection-drop redelivery')
self.assertEqual(settings.CELERY_WORKER_PREFETCH_MULTIPLIER, 1)
17 changes: 17 additions & 0 deletions shadowsocks_manager/shadowsocks_manager/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,3 +340,20 @@ def __repr__(self):
CELERY_CACHE_BACKEND = 'django-cache'
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
CELERY_BROKER_URL = 'amqp://guest:guest@{}:{}//'.format(RABBITMQ_HOST, RABBITMQ_PORT)

# shadowsocks.tasks.node_change_ips_softly blocks the worker ~6 min per node
# (5 min DNS TTL + 1 min AWS Config wait). The default 120s AMQP heartbeat would
# lapse during that blocking sleep, so the broker drops the connection and
# REDELIVERS the in-flight message — turning one scheduled rotation into an
# endless ~6-min loop (the 2026 IP-rotation incident). Disable the heartbeat so a
# long blocking task can't kill the connection, and prefetch one task at a time so
# there are no prefetched siblings to requeue either.
# (Proper long-term fix: make change_ips_softly non-blocking via chained tasks
# with apply_async(countdown=...). Tracked as a follow-up.)
CELERY_BROKER_HEARTBEAT = 0
CELERY_WORKER_PREFETCH_MULTIPLIER = 1

# Optional Slack incoming-webhook URL for IP-rotation alerts (empty => disabled).
# Falls back to the file $SSM_DATA_HOME/.slack-webhook when this env var is unset,
# so a URL can be dropped onto a running container without a restart.
SSM_SLACK_WEBHOOK_URL = config('SSM_SLACK_WEBHOOK_URL', default='')