Collection of my code. Includes: Parallelization --- Cactus: Shakersort --- Sunflower: Buckets --- Pumpkin: Queue --- Polyculture: random Chain or special-crop --- Maze: Multidrone Cheese
Intro
Steam guide for game version V1.0 updated as of 2025 Oct 19
This steam guide's thumbnail is completely unrelated. I just chose a funny square image that I had on my desktop. I love that meme.
I might have missed some steam text formatting such as [i] in-code becoming italics. Use your best judgement!
I'm enjoying the Oct 10 V1.0 update very much. I'm just uploading what I have currently.
I don't really have any automation routines for you nor any meta-optimization algorithms for you. Just basic code.
I follow github etiquette. My code is your code, and your code is my code, and we all steal are inspired off of each other.
I'm just a mediocre coder. Comment any fixes or improvements for me to steal for readers to take note of.
Unfortunately I'm also not a beginner to programming. I won't be able to help you if you don't already know some semblance of Pythonic or pseudocoding.
Lastly, I'm guessing that <code> in this game is Pythonic custom DSL. A lot of syntactic sugar is missing, such as:
using backslashes to indicate a continued line of code
list comprehensions
nonlocal keyword
yield keyword and generator functions
pointer passing (eg. **args)
etc. I'm sure you'll find some missing too
Notes
TODO list for me - DONE **args / Parallelism for non Axis-Aligned (AA) subtasks/drone-jobs using nonzero-arg subtasks which can handle inputs defined by Factory functions
- DONE Pumpkins parallelization
- DONE Polyculture parallelization
- Figure out how to detect the first-finished multiprocess drone job for the "long" parallelization strategy, instead of naively pulling the first-started drone job. This will improve speed for "long" strategy if the drone task does not have uniform job length / time taken.
- "finding" code for other parts of the game
Wish list to dev - using "== None" ticks my brain, years "is None" down the drain
- Try/Exception
- Using blackslash to indicate continue on newline of code
- List comprehensions
- yield keyword and generator functions
- Immediate iterable decoupling for dicts (for k, v in dict:)
- global keyword accepts multiple variables at once
- OOP
Additional Notes - The game uses wraparound movement. Treat the game board as an infinite tiling or a torus.
- My coding style is to be "generally safe", which includes stuff like function parameter checking, etc. This introduces additional if statements and code for the drone to slog through, so if you really care about speeding up code by a couple ticks I recommend you make a copy and remove those checks. If you want to use code for leaderboard running please do this.
- Whenever I use coordinates, I always use one argument (x, y) instead of two separate args x and y. It's a matter of personal preference I guess. (Also the function get_companion() returns the same type, a coordinate 2-tuple)
- Disclaimer: For multiprocessing/parallelization code, I only ever coded those with worldsize and n_drones = 32. I haven't really tested the other cases such as multidrone not unlocked, or weird drone-worldsize combinations.
Let us proceed fellow farmers.
Helpers
utils_num
def is_even(num):
return num % 2 == 0
def is_odd(num):
return not is_even(num)
def clamp(num, left, right):
# Clamp a number between a given min and max.
return min(max(num, left), right)
def ring_clamp(num, width):
return num % width
utils_list
I commonly use a 1d list to keep track of information on the game board. 2d matrix is too complicated for the scope so I just flatten it. You'll see me call this "serialized" version or something.
def all(ls):
for x in ls:
if not x:
return False
return True
def any(ls):
for x in ls:
if x:
return True
return False
def index(ls, item):
for i in range(len(ls)):
if ls[i] == item:
return i
return None # TODO raise exception
def max_with_index(ls):
# Args:
# ls: list
# Returns:
# value: maximum item
# index: maximum index
if not ls:
return None, -1
max_val = ls[0]
max_idx = 0
for i in range(1, len(ls)):
cur_val = ls[i]
if cur_val > max_val:
max_val = cur_val
max_idx = i
return max_val, max_idx
def extend(ls1, ls2):
# Extend ls1 with ls2's values.
for x in ls2:
ls1.append(x)
def reversed(ls):
# Reverse the list.
res = []
for i in range(len(ls) -1, -1, -1):
res.append(ls[i])
return res
def enumerate_list(ls):
# Returns a list of (idx, val) pairs
i = 0
result = []
for item in ls:
result.append((i, item))
i = i + 1
return result
def serialize_2d_index(idx, dim_len):
# Args:
# idx: 2-tuple
# dim_len: int length of 0th dimension
# Returns:
# int: serialized index
return idx[1] * dim_len + idx[0]
def deserialize_2d_index(idx, dim_len):
# Args:
# idx: int
# dim_len: int length of 0th dimension
# Returns:
# 2-tuple: 2d matrix index
return idx % dim_len, idx // dim_len
def serialize_2d_matrix(m):
# Flattens a 2d matrix into a list.
# Args:
# m: List[List]
# Returns:
# list: flattened 1d matrix
res = []
for i in range(len(m)):
extend(res, m[i])
return res
def deserialize_2d_matrix_CC(m, dim_len):
# Rebuilds 2d matrix from 1d.
# Uses the Contiguous Chunks strategy.
# Args:
# m: List
# dim_len: int 0th dim target length (number of rows)
# Returns:
# list[list]: 2d matrix or None if invalid
if dim_len <= 0:
return None
n = len(m)
if n % dim_len != 0:
return None # must divide evenly for strict reshape
# build dim_len rows
cols = n // dim_len
res = []
i = 0
while i < dim_len:
res.append([])
i = i + 1
# fill row-major (contiguous chunks)
i = 0
while i < n:
row_idx = i // cols
res[row_idx].append(m[i])
i = i + 1
return res
def deserialize_2d_matrix_unsafe_CC(m, dim_len):
# Rebuilds a 2D matrix from a 1D list,
# distributing items as evenly as possible.
# Uses the Contiguous Chunks strategy.
# Args:
# m: list
# dim_len: int 0th dim target length (number of rows)
# Returns:
# list[list]: 2D "best-effort" matrix distributed CC
if dim_len <= 0:
return [m]
n = len(m)
if n == 0:
temp = []
for _ in range(dim_len):
temp.append([])
return temp
# Compute base size and remainder
# (extra elements to distribute)
rows = dim_len
base = n // rows
remainder = n % rows
res = []
idx = 0
i = 0
while i < rows:
# Give one extra element to
# the first `remainder` rows
if i < remainder:
extra = 1
else:
extra = 0
row_len = base + extra
row = m[idx:idx + row_len]
res.append(row)
idx += row_len
i += 1
# If some elements remain (shouldn't
# happen), append them all to
# the last row
if idx < n:
res[-1].extend(m[index:])
return res
def deserialize_2d_matrix_RR(m, dim_len):
# Rebuilds 2d matrix from 1d.
# Uses the Round Robin strategy.
# Args:
# m: List
# dim_len: int 0th dim target length (number of rows)
# Returns:
# list[list]: 2d matrix or None if invalid
if dim_len <= 0:
return None
# NOTE update when list comps
res = []
i = 0
while i < dim_len:
res.append([])
i = i + 1
n = len(m)
if n == 0:
return res
if n % dim_len != 0:
return None # must divide evenly for strict reshape
# build exactly dim_len rows
res = []
i = 0
while i < dim_len:
res.append([])
i = i + 1
# round robin placement
for idx, item in enumerate_list(m):
res[idx % dim_len].append(item)
return res
def deserialize_2d_matrix_unsafe_RR(m, dim_len):
# Rebuilds a 2D matrix from a 1D list,
# distributing items as evenly as possible.
# Uses the Round Robin strategy.
#
# Args:
# m: list
# dim_len: int 0th dim target length (number of rows)
# Returns:
# list[list]: 2D "best-effort" matrix distributed RR
if dim_len <= 0:
return [m]
n = len(m)
# NOTE update when list comps [[] for _ in range(dim_len)]
res = []
for _ in range(dim_len):
res.append([])
if n == 0:
return res
# round robin placement
for idx, item in enumerate_list(m):
res[idx % dim_len].append(item)
return res
utils_move
For some reason, you can't import "hidden" functions (names starting with underscore) like with Python. Thus my "_goto_y()" will just be "goto_y()".
from utils_num import *
from utils_list import *
from utils_drone import *
def get_pos():
return (get_pos_x(), get_pos_y())
def _shortest_delta(curr, dest, size):
d = (dest - curr) % size
if d > size / 2:
d -= size
return d
def goto_x(tx):
ws = get_world_size()
cx = get_pos_x()
dx = _shortest_delta(cx, tx, ws)
if dx > 0:
for _ in range(dx):
move(East)
elif dx < 0:
for _ in range(-dx):
move(West)
return True
def goto_y(ty):
ws = get_world_size()
cy = get_pos_y()
dy = _shortest_delta(cy, ty, ws)
if dy > 0:
for _ in range(dy):
move(North)
elif dy < 0:
for _ in range(-dy):
move(South)
return True
def goto(target):
# The drone makes use of wraparound.
# Args:
# target: Tuple of length 2
# Returns:
# bool: Success state
tx, ty = target
goto_x(tx)
goto_y(ty)
return True
def u_turn(dir):
# Given a direction, return the
# opposite direction
if dir == North:
return South
elif dir == South:
return North
elif dir == East:
return West
elif dir == West:
return East
else:
return None
def calculate_p_AA_positions(axis):
# Calculates a list of all positions
# across an axis, ordered farthest
# distance first, inbetweens
# alternating direction, and
# closest distance last.
# For the use of reaching the
# farthest point first when starting
# an axis aligned drone task.
# Args:
# axis: "x" or "y"
# Returns:
# list[int]
recompute_ws()
positions = []
if axis == "x":
temp_pos = get_pos_x()
else:
temp_pos = get_pos_y()
farthest = temp_pos - (ws // 2)
if farthest < 0:
farthest += ws
for i in range(1, ws+1):
if is_odd(i):
d = i // 2
else:
d = - i // 2
positions.append(ring_clamp(farthest + d, ws))
return positions
utils_item
def check_has_item(item, num=1):
# DEPRECATED use can_pay_for()
# Args:
# item: obj under class Items
# num: number to check
if num_items(item) < num:
return False
return True
def can_pay_for(entity, num=1):
# Checks if I can pay for the
# desired item(s).
costs = get_cost(entity)
if not costs:
return True
for k in costs:
v = costs[k]
if not check_has_item(k, v * num):
return False
return True
Helpers Pt 2
utils_grow
from utils_item import *
from utils_num import *
# Used for logic in polyculture
entity_growth_timing = {# Entity: tuple[float, float], in seconds
Entities.Apple: (0.2, 0.2),
Entities.Bush: (3.2, 4.8),
Entities.Cactus: (1.0, 1.0),
Entities.Carrot: (4.8, 7.2),
Entities.Dead_Pumpkin: (0, 0),# JIC
Entities.Dinosaur: (0.18, 0.22),
Entities.Grass: (0.5, 0.5),
Entities.Hedge: (0, 0),# JIC
Entities.Pumpkin: (0.2, 3.8),
Entities.Sunflower: (5.6, 8.4),
Entities.Treasure: (0, 0),# JIC
Entities.Tree: (5.6, 8.4),
}
entity_allowed_groundtype = {# Entity: tuple containing allowed ground types
Entities.Apple: (Grounds.Grassland, Grounds.Soil),
Entities.Bush: (Grounds.Grassland, Grounds.Soil),
Entities.Cactus: (Grounds.Soil),
Entities.Carrot: (Grounds.Soil),
Entities.Dead_Pumpkin: (Grounds.Soil),# assign as if pumpkin
Entities.Dinosaur: (Grounds.Grassland, Grounds.Soil),
Entities.Grass: (Grounds.Grassland, Grounds.Soil),
Entities.Hedge: (),# JIC.
Entities.Pumpkin: (Grounds.Soil),
Entities.Sunflower: (Grounds.Soil),
Entities.Treasure: (),# JIC.
Entities.Tree: (Grounds.Grassland, Grounds.Soil),
}
entity_needs_soil = {
# Exclusively needs soil.
Entities.Cactus,
Entities.Carrot,
Entities.Pumpkin,
Entities.Sunflower,
}
entity_needs_grassland = set()# Exclusively needs grassland.
def calculate_actual_growth_time():
# Account for water level: linear interpolation
# which is z = val * (left*(1-y) + right*(y))
ent = get_entity_type()
if ent == None or ent not in entity_growth_timing:
return 0
return entity_growth_timing[ent][0] * (1 / (1 + 4 * get_water()))
def maintain_water(thresh):
# Args:
# thresh: float threshold to maintain above
# Returns:
# bool: Success state
if not check_has_item(Items.Water):
return False
thresh = clamp(thresh, 0, 1)
while get_water() < thresh:
use_item(Items.Water)
return True
def till_for_entity(ent):
# Prepare the ground for planting.
# Simpler version but less safe:
# if get_ground_type() not in entity_allowed_groundtype[ent]:
# till()
#
ground = get_ground_type()
if ent in entity_needs_soil and ground != Grounds.Soil:
till()
elif ent in entity_needs_grassland and ground != Grounds.Grassland:
till()
return True
utils_farm
from utils_drone import *
from utils_move import *
def _factory_till_row(args_tuple):
start_pos = args_tuple
def _subtask_till_row():
recompute_ws()
goto(start_pos)
for _ in range(ws):
if get_ground_type() != Grounds.Soil:
till()
move(East)
return True
return _subtask_till_row
def _subtask_harvest_eastwards():
recompute_ws()
for _ in range(ws):
harvest()
move(East)
return True
def till_all():
recompute_parallelism()
px = get_pos_x()
args = []
positions = calculate_p_AA_positions("y")
for i in range(ws):
args.append((px, positions[i]))
return drone_task(_factory_till_row, args)
def harvest_all():
# Use in case you planted an expensive field of crops.
recompute_parallelism()
return drone_naive_AA_task(_subtask_harvest_eastwards, North)
def randcoords():
recompute_ws()
return ((random() * ws) // 1, (random() * ws) // 1)
def get_companion_saferand():
# Returns a random coordinate if
# get_companion() fails.
ret = (get_companion(),)
if ret[0] == None:
return None, randcoords()
else:
return ret[0][0], ret[0][1]
utils_tick
def naive_wait(ticks):
# Wait for ticks ticks.
# This function is unsafe.
# This function has overhead (adjusted).
# 1: Evaluate ticks - 3
# 1: Start for loop
# 1: IDK
for _ in range(ticks - 3):
pass
utils_tick
def naive_wait(ticks):
# Wait for ticks ticks.
# This function is unsafe.
# This function has overhead (adjusted).
# 1: Evaluate ticks - 3
# 1: Start for loop
# 1: IDK
for _ in range(ticks - 3):
pass
Parallelization
If you don't have any experience with parallelization specifically multiprocessing as a means to achieve parallelization, then this part will be quite hard. Thankfully for me, I'm in uni and have taken an OS course which covered scheduling, multiprocessing, related topics etc so at least I'm not hopeless. (What the hell is a semaphore haha)
I wrote most of my parallelization functions in hopes that they can be used independently of Megafarm unlock level. I'm not sure if it works though, I haven't tried to simulate it. (eg. Hopefully, prevents using the function max_drones() if megafarm hasn't been unlocked, as that will raise errors).
Disclaimer, Caution: I've only tried all my parallelization code on the "short" mode with worldsize = max_drones() = 32.
I finally have a solution for **args-type subtasks!!! I sincerely hope that the drone_task function can help, I spent a lot of time on that. The way I deal with it is to treat **args as a tuple object containing m arguments to pass in. Then any subtask should be defined as taking in one tuple parameter, and can access parameters through iterating the tuple. Doesn't this remind you of how to use the Stack when programming in Assembly? :D
Also, this is the one time I've unironically used the factory design pattern. Thank you to this game for making me write "clean" code. Examples for how to make factory functions can be found in my implementations for polyculture, sunflowers, etc.
utils_drone
# A solution to **args is to design the subtask to accept a tuple parameter.
def max_drones_safe():
# Requires having unlocked Auto Unlock.
if num_unlocked(Unlocks.Megafarm) < 1:
return 1
else:
return max_drones()
# Drone parallelism
def choose_parallelism(ws):
# Recompute and decide how the main drone participates in parallel work.
#
# My definition of parallelism assumes the work that needs
# parallelism needs so under extension on one dimension
# of the game board.
#
# Heuristic:
# - If Megafarm is locked: no parallelism ("none").
# - Prefer "short" (main drone is a worker) when:
# - The field width divides evenly among all drones, or
# - The total number of drones is small relative to worldsize.
# - However this assumes subtasks complete in roughly uniform
# time, and that the n_drones*200ticks time overhead nearing
# all subtasks' completion is acceptable. It could cause idle
# drones or imbalance if subtask duration is random.
# - Prefer "long" (main drone only delegates tasks) when:
# - Removing the main drone yields an even partition, or
# - The division is cleaner (greater GCD) without the main.
# - There is a job requirement to proactively respawn drones
# or subtask length follows a non-uniform distrubution.
# - But I haven't found a solution to computing the first
# job-completed drone task to efficiently manage drones
# before they become idle. (TODO)
# This balances even workload distribution with minimal idle time.
# Megafarm not unlocked → no parallelism
if get_cost(Unlocks.Megafarm) == get_cost(Unlocks.Megafarm, 0):
return "none"
n = max_drones()
# Safety. If only main drone exists: it must work
if n <= 1:
return "short"
# If we have more drones than dimension, either mode will work.
# "long" somehow seems safer. I can't explain.
if n > ws:
return "long"
# If workforce is small, subtask-ending-overhead shouldn't be large
if n <= 4:
return "short"
# Prefer an even partition using all workers (main included)
if ws % n == 0:
return "short"
# If not even, see if excluding the main gives an even split
if ws % (n - 1) == 0:
return "long"
# If workforce is small vs width, every hand helps
if (n - 1) < ws // 4:
return "short"
# Fallback: pick the mode that yields fewer ragged strips (GCD-based)
def _gcd(a, b):
while b:
a, b = b, a % b
return a
g_all = _gcd(ws, n)
g_nomin = _gcd(ws, n - 1)
# Larger GCD → cleaner tiling. If tie, prefer delegator for less contention.
if g_all >= g_nomin:
return "short"
else:
return "long"
def recompute_ws():
global ws
ws = get_world_size()
def recompute_parallelism(new_type=None):
# Corner case against runtime upgrades of worldsize or n_drones.
# Call at the start of big entry points like till_all() or maintask().
# Args:
# new_type: manual application of one of "none", "short", "long"
recompute_ws()
global parallelism_type
if new_type == None:
parallelism_type = choose_parallelism(ws)
else:
parallelism_type = new_type
def drone_task(factory, args, local_p_type=None):
# This function is for usage with a "N independent jobs" pattern.
# Convention:
# Ensure subtasks are "closed" w.r.t. starting position. This is
# in case of the "short" mode needing main drone to be a worker.
# If not, ensure so externally around your function call.
# Args:
# factory: factory function taking m parameters which
# can cleanly create zero-arg subtasks.
# args: list of m-tuple of arguments to pass into factory.
# (implied) length: number of jobs. Inferred as n=len(args)
# local_p_type: local scope parallelism_type.
# Useful for second-order delegators.
# Returns:
# results: list of n results
length = len(args)
if length == 0: # Trivial
return []
if local_p_type != None:
p_type_to_use = local_p_type
else:
# global keyword not needed for readonly
p_type_to_use = parallelism_type
_drones = []
for _ in range(length):
_drones.append(None)
results = []
for _ in range(length):
results.append(None)
if p_type_to_use == "none":
# Does not account for drone parallelism.
for i in range(length):
res = factory(args[i])()
results[i] = res
return results
elif p_type_to_use == "short":
# Uses the main drone as a worker.
for i in range(length):
subtask_watcher = spawn_drone(factory(args[i]))
if subtask_watcher:
_drones[i] = subtask_watcher
else:
res = factory(args[i])()
results[i] = res
for i in range(len(_drones)):
if _drones[i] != None:
results[i] = wait_for(_drones[i])
return results
elif p_type_to_use == "long":
# Uses the main drone as a task delegator.
_drones_idx = []# Keeps track of index w.r.t. results list.
counter, limit = 0, length
while counter < limit:
if len(_drones) < max_drones():
subtask_watcher = spawn_drone(factory(args[counter]))
if subtask_watcher:
_drones.insert(0, subtask_watcher)
_drones_idx.insert(0, counter)
counter += 1
else:
# n_drones under capacity but spawn_drone() failed.
quick_print("Error: unexpected spawn_drone() failure")
return None
if len(_drones) >= max_drones():
# TODO corner case for non-uniform subtask timing
# Detect the first-job-exited drone instead of
# the first-job-started drone
idx = _drones_idx.pop()
res = wait_for(_drones.pop())
results[idx] = res
while _drones:
idx = _drones_idx.pop()
res = wait_for(_drones.pop())
results[idx] = res
return results
else:
quick_print("Error: Unknown parallelism type:", p_type_to_use)
return None
# Insert Axis Aligned Drone Task code here because the code section was too long to put into 1 steamguide section
if __name__ != "__main__":
ws = get_world_size()
parallelism_type = choose_parallelism(ws)
quick_print("parallelism_type:", parallelism_type)
Parallelization Pt 2
This code for parallelization is pretty old. You should probably use the generalized drone_task instead.
def drone_naive_AA_task(subtask, dir, length=None):
# OLD CODE use drone_task() instead
# Starts a naive Axis-Aligned subtask along an axis.
# Make sure the drone is in the right place before starting.
# Movement assumes drone can wraparound the game board.
# Convention:
# TODO while I figure out **args, subtasks cannot take inputs.
# Ensure subtasks are "closed" w.r.t. starting position. This is
# in case of the "short" mode needing main drone to be a worker.
# If not, ensure so externally around your function call.
# Subtasks returning bool are success states. False = Failure
# TODO take care of subtasks returning results. I havent figured
# out what to do if a subtask ran successfully but returns 0 results,
# as that is a Falsy evaluation.
# Example:
# Shakersorting along one dimension on the Cactus puzzle
# Args:
# dir: Cardinal direction
# mode: one of "none", "short" or "long"
# subtask: name of function to perform
# Returns:
# bool: success state # Is the reason for naive.
if length == 0:# Trivial
return True
elif length == None:
# "== None" looks so wrong. But "is None" doesn't exist in
# the in-game Pythonic DSL.
# Putting length=ws as default param binds it at import time.
# Recalculate here to take care of runtime ws-upgrade increases.
length = ws
_drones = []
if parallelism_type == "none":
# Does not account for drone parallelism.
for _ in range(length):
if not subtask():
return False
move(dir)
return True
elif parallelism_type == "short":
# Uses the main drone as a worker.
for _ in range(length):
subtask_watcher = spawn_drone(subtask)
if subtask_watcher:
_drones.append(subtask_watcher)
else:
if not subtask():
return False
move(dir)
while _drones:
if not wait_for(_drones.pop()):
return False
return True
elif parallelism_type == "long":
# Uses the main drone as a task delegator.
counter, limit = 0, length
while counter < limit:
if len(_drones) < max_drones():
subtask_watcher = spawn_drone(subtask)
if subtask_watcher:
_drones.insert(0, subtask_watcher)
counter += 1
move(dir)
else:
# n_drones under capacity but spawn_drone() failed.
quick_print("Error: unexpected spawn_drone() failure")
return False
if len(_drones) >= max_drones():
if not wait_for(_drones.pop()):# TODO detect the first
# exited drone instead of first-job-started drone
return False
while _drones:
if not wait_for(_drones.pop()):
return False
return True
else:
print("Error: Unknown parallelism type:", parallelism_type)
return False
Basic Crops
Left as an exercise for the reader :)
Important: Always prioritize polyculture. Only use naive strategies like this if you need an amount of a certain crop and you haven't unlocked PC.
Polyculture is exclusively an output multiplier and it does not apply a proportionate cost penalty. It drastically increases the ratio of harvest reward to entity-planting cost!
Time efficiency is increased due to the above. The amount of non-polyculture entities you have to harvest to get the same output rates as a strategy that makes use of polyculture, scales proportionally to the inverse of the current polyculture multiplier. I.e., for the same output rate, a polyculture-enabled strategy needs 1/(PC mult) the time.
Grass
Naive
naive double for loop
Bushes
Naive
naive double for loop
Carrots
Naive
naive double for loop
Parallelized
p_naive_carrots
from utils_list import *
from utils_drone import *
from utils_grow import *
from utils_farm import *
from utils_move import *
from utils_item import *
WATER_THRESH = 0.75
def stop_condition():
# Specify your stopping condition here.
return num_items(Items.Carrot) > 10000000000
def _factory_carrots_row(args_tuple):
# args_tuple: (row_index,)
row_index = args_tuple[0]
def job():
start = get_pos()
while not stop_condition():
goto((0, row_index))
for _ in range(ws):
if can_harvest():
harvest()
plant(Entities.Carrot)
maintain_water(WATER_THRESH)
move(East)
goto(start)
return True
return job
def maintask():
recompute_parallelism()
args = []
i = 0
while i < ws:
args.append((i,))
i += 1
drone_task(_factory_carrots_row, args)
return True
if __name__ == "__main__":
clear()
till_all()
maintask()
Trees
Naive
clear()
ws = get_world_size()
for i in range(ws):
for j in range(ws):
if (i % 2 == 1 and j % 2 == 0) or (i % 2 == 0 and j % 2 == 1):
plant(Entities.Tree)
move(East)
move(North)
while True:
for i in range(ws):
for j in range(ws):
curr_ent = get_entity_type()
if curr_ent == Entities.Tree and can_harvest():
harvest()
plant(Entities.Tree)
move(East)
move(North)
Parallelized
Left as an exercise to the reader.
General Polyculture (Parallelization)
I just want to say that I'm thankful for school for teaching me clean code and what a Factory design pattern even is, because the more I use it for updating my old parallelization code, the more thankful I get.
Notes
As far as I know, "polyculture" is a yield multiplier for an existing, planted crop ("target"), that only activates if another specific crop (requested upon planting the "target", of which, is called the "companion") exists at a specific location, no matter the growth status nor infected status of the companion. It just has to exist and the multiplier will activate for the target.
The request range will be limited (in-game docs say the range is 3). In addition, it does not apply a proportionate planting-cost penalty.
Additional (future) implementation idea
Drones work in pairs. One plants the start of a chain of companions, and one harvests the end of that chain of companions. I'm unsure of how to proceed with this though, because I don't want to deal with all the race conditions I imagine must be plaguing the scenario.
As always, I leave it as an exercise to the reader.
Naive / chain-random
Here is my solution for polyculture. Thanks to >ANY KEY_'s guide for the chain-random idea.
The code works on the principle that we plant chains of companions. At some random point in the future, the head of a chain will randomly happen to intersect a middle-link of the chain. We will harvest the crop in this chain intersection that already has its companion planted and replace it with the head of the new chain. When a drone is continuing the chain, it will always harvest() before replacing the tile with the next required companion to continue the chain. This gives us amortized results, where the algorithm requires some startup time to ramp up results / populate the game board with the chains.
I realize it's not the most efficient but hey at least it works on my machine. In addition, it is just hopium that plants grow faster then the time it takes for the next drone to randomly come to the tile and harvest it. That's why I used water.
The rates are in all honesty inefficient for the potential of parallelization. I'm still looking for improvements to steal. If you want a more efficient script that will farm all polyculture-enabled crops at a time, please modify the "onecrop" strategy in the below section to plant different crops based on a modulus rule on what column/row a onecrop tile is in.
Farming results follow the distribution of potential companions. Which I assume is uniform.
get_companion() is not None-safe, so I made my own version and put it into utils_farm. Please check that section for updated code.
TODO there will still be some warnings for failure to plant on a different tile type. This is due to a race condition I haven't figured out how to solve, but in the meantime the code will just skip hte tile and continue so it should work okay.
p_pc
# parallel polyculture.
from utils_move import *
from utils_farm import *
from utils_grow import *
from utils_drone import *
# Hyperparameter chance for drone not to follow its companion.
# Should prevent drones bunching up.
# TODO Tune this later
# Tuning: 0.10: some warnings for planting nontill-plants on
# tilled tiles and conversely for till-plants
# Around 0.20 is fine. Still some warnings in the long run though.
HYP_RANDMOVE_CHANCE = 0.20
WATER_THRESH = 0.75
def stop_condition():
# Write your desired target
# condition to stop the run at.
# True if condition fulfilled.
# I have it set currently for Top Hat unlock.
# below is just to shorten line width.
_a = num_items(Items.Hay) > 1000000000
_b = num_items(Items.Wood) > 10000000000
_c = num_items(Items.Carrot) > 1000000000
return all([_a, _b, _c])
def _subtask_pc():
while True:
if stop_condition():
return True
if random() < HYP_RANDMOVE_CHANCE:
goto(((random() * ws) // 1, (random() * ws) // 1))
continue
companion, (x, y) = get_companion_saferand()
goto((x, y))
if companion == None:
continue
if can_harvest():
harvest()
till_for_entity(companion)
plant(companion)
maintain_water(WATER_THRESH)
def _factory_subtask_pc(args_tuple):
start_pos = ((random() * ws) // 1, (random() * ws) // 1)
def _subtask_randstart_pc():
goto(start_pos)
return _subtask_pc()
return _subtask_randstart_pc
def maintask():
recompute_parallelism()
if parallelism_type == "none":
_subtask_pc()
elif parallelism_type in ("short", "long"):
# The subtask is not finite. Therefore "long"
# has no use. Use "short" drone strategy.
recompute_parallelism("short")
args = []
for _ in range(max_drones()):
args.append(())
drone_task(_factory_subtask_pc, args)
else:
quick_print("Error: Unknown parallelism type:", parallelism_type)
return None
if __name__ == "__main__":
clear()
# We start at (0, 0) with a wheat entity.
# After planting something, we plant its companion.
# Rely on randomness to come back to the original plant
# meaning there will be some start-up time required.
maintask()
Specialized Polyculture (Parallelization)
I made this code that focused on farming one crop using polyculture. This is specifically for getting a large amount of a crop in a short timeframe.
As with the explanation under the Basic Crops section, always prioritize using this strategy over non-polyculture strategies (such as filling the entire field with carrots). This is in two parts due to the (1) Cost ratio efficiency, and (2) Time-to-harvest efficiency.
I can't really be bothered to explain it all because I've already left a bunch of code comments in there. Please read them over for explanation.
If you choose to use fertilizer on an expensive crop w.r.t. growth time, it might use up a ton of it. The Tree is the slowest by far, at a growth time between (4.8, 7.2) seconds. So stock up before trying for achievements.
If you want to use fertilizer, I highly suggest you use water at the same time, with the "both" speedup strategy.
There's also no cost checking because I assume you're already well-established in terms of what you have in stock. TODO that might be for improvement in case anybody wants to use the code in their automation routines or meta-optimization algorithms.
Fix: fert usage didnt account for water level. Now improved fert usage
Moved some logic away into utils_grow
p_pc_onecrop
# Optimizes the output of a specifc crop
# exclusively, using polyculture.
# One of the use cases of fertilizer since
# the surrounding crops don't matter.
# All we need is for them to exist to be
# counted as a companion plant.
# For Weird Substance farming, please enable
# the relevant flag and farm grass.
from utils_list import *
from utils_drone import *
from utils_grow import *
from utils_farm import *
from utils_move import *
from utils_item import *
ONECROP_TILE_SPACING = 5
# Caution: uses a lot of fertilizer. Always prioritize
# the "both" strategy over solely "fert".
# With 32 drones, farming trees, the "both" strategy
# costs approximately 100fert/sec, or 6k/min.
# Keep in mind to stock up before trying for achievements.
SPEEDUP_STRATEGY = "both" # One of "none", "water", "fert", "both"
WATER_THRESH_ONECROP = 0.95
FLAG_FARM_WEIRD_SUBSTANCE = False
TARGET_CROP = Entities.Carrot# Don't forget inside
def stop_condition():# this function!!!
# Specify your stopping condition here.
return num_items(Items.Carrot) > 10000000000
def get_onecrop_indices():
# Infer if onecrop's pc tiling range (3) in
# a direct overlap with another onecrop is
# acceptable based on user's selected SPACING.
# This function computes overlap acceptability
# in a relaxed manner;
# (min of PC spacing and selected spacing.)
recompute_ws()
overlap_acceptable = ONECROP_TILE_SPACING <= 3
step = ONECROP_TILE_SPACING
indices = []
y = 0
while y < ws:
x = 0
while x < ws:
indices.append(y * ws + x)
x += step
y += step
if not overlap_acceptable:
good_indices = []# Memory in this game is pretty much free
for i in range(len(indices)):
temp = indices[i]
# Consider the 0th col always occupied.
if not temp % ws > ws - min(3 + 1, ONECROP_TILE_SPACING):
good_indices.append(temp)
indices = good_indices
return indices
def _factory_goto_and_plant(args_tuple):
companion = args_tuple[0]
pos = args_tuple[1]
def _subtask_goto_and_plant():
goto(pos)
harvest()# destroy current plant
till_for_entity(companion)
if not plant(companion):
pass # Don't return False here.
# Just keep going and fail silently
return True
return _subtask_goto_and_plant
def _factory_onecrop(args_tuple):
pos = args_tuple[0]
local_p_type = args_tuple[1]
def _subtask_onecrop():
flag_has_used_fert_this_round = False
goto(pos)
# This is a temporary measure while I figure out how
# to delay high-level drones from spawning more drones
# while other high level drones are being spawned by
# the main drone.
do_a_flip()
do_a_flip()
till_for_entity(TARGET_CROP)
plant(TARGET_CROP)
while not stop_condition():
if can_harvest():
if FLAG_FARM_WEIRD_SUBSTANCE:
use_item(Items.Weird_Substance)
harvest()
flag_has_used_fert_this_round = False
# Use this if any onecrop's pc tiling
# range (3) overlaps with another onecrop
# directly, where obstruction can occur if
# another onecrop's companion needs to be on
# the current onecrop's tile.
if ONECROP_TILE_SPACING <= 3:
till_for_entity(TARGET_CROP)
plant(TARGET_CROP)
companion, com_pos = get_companion_saferand()
args = [(companion, com_pos)]# length 1
prev_pos = get_pos()
drone_task(_factory_goto_and_plant, args, local_p_type)
goto(prev_pos)
else:
# Always water before using fert. Water decreases
# growth time multiplicatively while fert is additive.
if SPEEDUP_STRATEGY in ("both", "water"):
maintain_water(WATER_THRESH_ONECROP)
if SPEEDUP_STRATEGY in ("both", "fert"):
if not flag_has_used_fert_this_round:
# Fertilizer decreases growth time by 2 seconds.
actual_timing = calculate_actual_growth_time()
num_ferts_to_use = actual_timing // 2 + 1
for _ in range(num_ferts_to_use):
use_item(Items.Fertilizer)
flag_has_used_fert_this_round = True
if flag_has_used_fert_this_round:
use_item(Items.Weird_Substance)
if SPEEDUP_STRATEGY in ("none"):
do_a_flip()# Conserve energy.
return True
return _subtask_onecrop
def main():
# In this problem there are two locations
# of parallelization. 1: initial drone
# to each onecrop tile, and 2: further
# delegation to save time during
# companion planting
clear()
strategy = None
tile_1d_indices = get_onecrop_indices()
temp = len(tile_1d_indices)
# TODO consider recompute_parallelism
# manually since subtasks are not defined
# w.r.t. worldsize?
if parallelism_type == "none":
local_p_type = "none"
else:
recompute_parallelism()
local_p_type = "short"
args = []
for i in range(max_drones()):
args.append((deserialize_2d_index(tile_1d_indices[i], ws), local_p_type))
drone_task(_factory_onecrop, args)
if __name__ == "__main__":
main()
Weird Substance (Old)
Here's some code to quickly get some Weird Substance on a wheat field. I don't bother with a larger scope for this because as soon as you get good multipliers on outputs, you can apply WS and get half of that high output. There's not much required to unlock mazes (on the order of only a couple hundred thousand).
If you require more, please see the specialized polyculture code, which has a function to farm Weird Substance as well.
def should_apply_weird_s(pos):
# Args:
# pos: 2-tuple curr pos (x, y)
# Returns:
# bool: Truth value
cx, cy = pos
return ((2 * cx + cy) % 5) == 0
if __name__ == "__main__":
clear()
ws = get_world_size()
for i in range(ws):
for j in range(ws):
if should_apply_weird_s((get_pos_x(), get_pos_y())):
use_item(Items.Weird_Substance)
move(East)
move(North)
Pumpkin (Parallelization)
Thanks to Wrath for the following code becuase I got too lazy to code this. It makes parallel passes over the field and replants over empty tiles and dead pumpkins, until all pumpkins are grown, however might have some minute overhead in between harvesting and replanting.
# Wrath's Pumpkin Parallelization code
# PUMPKIN PARLLL
from utils_list import *
from utils_move import *
from utils_item import *
from utils_drone import *
def build_row_regions(dim, workers):
# Returns a list of region tile-index lists (row stripes).
if workers <= 0:
workers = 1
if workers > dim:
workers = dim
base = dim // workers
rem = dim % workers
regions = []
i = 0
while i < workers:
if i < rem:
rows_for_i = base + 1
else:
rows_for_i = base
start_row = i * base
if i < rem:
start_row += i
else:
start_row += rem
end_row = start_row + rows_for_i
tiles = []
r = start_row
while r < end_row:
c = 0
while c < dim:
tiles.append(r * dim + c)
c += 1
r += 1
regions.append(tiles)
i += 1
return regions
def _factory_prepare_and_plant_region(args_tuple):
# args_tuple: (positions_list,)
region_positions = args_tuple[0]
def job():
i = 0
while i < len(region_positions):
idx = region_positions[i]
x, y = deserialize_2d_index(idx, ws)
goto((x, y))
if get_ground_type() != Grounds.Soil:
till()
if not can_pay_for(Entities.Pumpkin):
return False
plant(Entities.Pumpkin)
i += 1
return True
return job
def _factory_check_region_ready(args_tuple):
# args_tuple: (positions_list,)
region_positions = args_tuple[0]
def job():
# Return True only if every tile in region is harvestable pumpkin.
i = 0
all_ready = True
while i < len(region_positions):
idx = region_positions[i]
x, y = deserialize_2d_index(idx, ws)
goto((x, y))
ent = get_entity_type()
if ent == Entities.Dead_Pumpkin:
# Immediate fix; forces barrier to wait longer
if not can_pay_for(Entities.Pumpkin):
return False
if get_ground_type() != Grounds.Soil:
till()
plant(Entities.Pumpkin)
all_ready = False
else:
# Must be a pumpkin and must be harvestable
if ent != Entities.Pumpkin or not can_harvest():
all_ready = False
i += 1
return all_ready
return job
def _factory_harvest_region(args_tuple):
# args_tuple: (positions_list,)
region_positions = args_tuple[0]
def job():
# Harvest all tiles in the region (they should all be ready).
i = 0
while i < len(region_positions):
idx = region_positions[i]
x, y = deserialize_2d_index(idx, ws)
goto((x, y))
if get_entity_type() == Entities.Pumpkin and can_harvest():
harvest()
i += 1
return True
return job
def _factory_replant_region(args_tuple):
# args_tuple: (positions_list,)
region_positions = args_tuple[0]
def job():
# Re-till if needed and replant region.
i = 0
while i < len(region_positions):
idx = region_positions[i]
x, y = deserialize_2d_index(idx, ws)
goto((x, y))
if get_ground_type() != Grounds.Soil:
till()
if not can_pay_for(Entities.Pumpkin):
return False
plant(Entities.Pumpkin)
i += 1
return True
return job
def main():
recompute_ws()
# Force short mode (main participates; avoids long-mode watcher caveats).
recompute_parallelism("short")
workers = max_drones()
if workers < 1:
workers = 1
regions = build_row_regions(ws, workers)
# Build args as tuple-of-one per region
args = []
i = 0
while i < len(regions):
if len(regions[i]) > 0:
args.append((regions[i],))
i += 1
# Initial prepare + plant across all regions
results = drone_task(_factory_prepare_and_plant_region, args)
if results == None:
return
i = 0
while i < len(results):
if not results[i]:
return
i += 1
# Global synchronized cycles
while True:
# Wait until all regions report ready
all_ready = False
while not all_ready:
checks = drone_task(_factory_check_region_ready, args)
if checks == None:
return
all_ready = True
i = 0
while i < len(checks):
if not checks[i]:
all_ready = False
i += 1
# Harvest everywhere in parallel
hres = drone_task(_factory_harvest_region, args)
if hres == None:
return
i = 0
while i < len(hres):
if not hres[i]:
return
i += 1
# Replant everywhere in parallel
rres = drone_task(_factory_replant_region, args)
if rres == None:
return
i = 0
while i < len(rres):
if not rres[i]:
return
i += 1
if __name__ == "__main__":
clear()
main()
Old implementation idea for parallelization
Just noting this down. My old idea is to split the drones naively. Each drone is responsible for a "patch" (If splitting across a dimension: would be a row if n_drones == ws, and if going for a locally close patch, cut the board into rectangles). Then following the typical serial-version pumpkin growing algorithm. Workers remove themselves when their patch is fully grown. Main drone harvests and starts loop again.
Pumpkin (Old)
queue_pumpkin
Uses queue datastructure in a smarter way compared to naive approach.
Because memory in this game is practically free compared to time / tickspeed.
TODO implement speedup based on straggling pumpkins.
# Version that uses a head index into a list.
from utils_list import *
from utils_move import *
from utils_item import *
# TODO
speedup_field_percentage = 0.2
speedup_strategy_list = ["none", "water", "fertilizer"]
speedup_strategy = "none"
clear()
ws = get_world_size()
def till_all():
goto((0, 0))
for i in range(ws):
for j in range(ws):
if get_ground_type() != Grounds.Soil:
till()
move(East)
move(North)
return True
def replant_pumpkins():
if not can_pay_for(Entities.Pumpkin, ws * ws):
print("Cannot pay for all pumkpins")
return False
for i in range(ws):
for j in range(ws):
plant(Entities.Pumpkin)
move(East)
move(North)
return True
# queue as (list, head)
arr_is_grown = list(range(ws * ws))
head = 0
def q_not_empty():
return head < len(arr_is_grown)
def q_pop():
# O(1) amortized pop-front
global head
x = arr_is_grown[head]
head += 1
return x
def q_push(x):
arr_is_grown.append(x)
def q_compact_if_needed():
# occasional compact to free consumed prefix
global arr_is_grown
global head
if head > 1024 and head > len(arr_is_grown) // 2:
arr_is_grown = arr_is_grown[head:]
head = 0
till_all()
replant_pumpkins()
while True:
if not q_not_empty():
harvest()
goto((0, 0))
arr_is_grown = list(range(ws * ws))
head = 0
if not replant_pumpkins():
break
while q_not_empty():
curr_pos_to_check = q_pop()
goto(deserialize_2d_index(curr_pos_to_check, ws))
if not can_harvest():
if get_entity_type() == Entities.Dead_Pumpkin:
if not can_pay_for(Entities.Pumpkin):
break
plant(Entities.Pumpkin)
q_push(curr_pos_to_check)
q_compact_if_needed()
Naive
My old code. Use the above instead.
from utils_list import *
from utils_move import *
from utils_item import *
speedup_field_percentage = 0.2
speedup_strategy_list = ["none", "water", "fertilizer"]
speedup_strategy = "none"
clear()
ws = get_world_size()
# Serialized list of positions on the field
# Represents ungrown pumpkins still needed to revisit
arr_is_grown = list(range(ws * ws))
def replant_pumpkins():
if not can_pay_for(Entities.Pumpkin, ws * ws):
return False
for i in range(ws):
for j in range(ws):
if get_ground_type() != Grounds.Soil:
till()
plant(Entities.Pumpkin)
move(East)
move(North)
return True
replant_pumpkins()
while True:
if not arr_is_grown:
harvest()
goto((0, 0))
arr_is_grown = list(range(ws * ws))
if not replant_pumpkins():
break
while arr_is_grown:
curr_pos_to_check = arr_is_grown.pop(0)# Killer O(n).
goto(deserialize_2d_index(curr_pos_to_check, ws))
if not can_harvest():
if get_entity_type() == Entities.Dead_Pumpkin:
if not check_has_item(Items.Carrot, 1):
break
if get_ground_type() != Grounds.Soil:
till()
plant(Entities.Pumpkin)
arr_is_grown.append(curr_pos_to_check)
Cactus (Parallelization, doesn't use Factory)
Given the constraints of using a drone, we can only:
Move one tile at a time
Swap two adjacent cacti underneath the drone (in a cross pattern)
it seems like shakersort was built for these constraints. Compared to gnomesort (manual insertion sort), this seems more efficient because we can still perform sorting (swaps) on the way back while repositioning the drone.
Uses rowwise then colwise shakersort. This should have nicer big O constants on the order of n(n-1)/2 compared to naive n**2, however rarely there will be a speedup past that, because axis-sorting requires all sorting jobs to be finished before proceeding to the next axis. This per-axis sort strategy makes this sorting algorithm accessible for parallelization later on.
Math 😱😱
If you haven't noticed already, sorting rowwise then sorting columns won't un-sort the rows.
"Stability" of row-sortedness after column-sorting comes from the Monotonicity Principle (which comes from the Pigeonhole principle). If the initial row sort ensures that every element in column j is less than or equal to its row-neighbor in column j+1 (where $A_{i,j} leq A_{i,j+1}$), the relative order between the two columns is preserved regardless of how the individual columns are sorted. The k-th smallest value in column j must be less than or equal to the k-th smallest value in column j+1.
In English:
When you sort every row (left to right), you establish a rule that is true for every single row: nothing in column j is greater than its neighbor in column j+1.
Sorting the columns up and down simply moves the largest numbers in each column to the top rows and the smallest numbers to the bottom rows.
Because the rule established in step one is true for every pair of elements across the columns, the vertical sorting cannot break the horizontal order.
In addition, after sorting row-wise and starting the col-wise sort, the first and last columns seem to always finish faster.
I think this comes from boundary saturation.
Suppose H is the maximum height of a cactus (which is 15).
After you finish the row-wise sort, in each row, the first column holds that row’s minimum, and the last column holds that row’s maximum.
So when you then sort by columns, the first column is a list of row-minima, and the last column is a list of row-maxima.
If cactus height selection is independent and identically distributed (i.i.d.):
The distribution of the row minimum concentrates near 0.
the distribution of the row maximum concentrates near H. As the row length m grows, the chance that a row actually contains the absolute min (0) or the absolute max (H) grows larger, so many rows will contain at least one min (0) and/or max (15). If it doesn't contain one of these, then the chances are even higher to contain one value off of the min or max (1 or 14).
Statistically, using the distribution where one row-max H exists in the row of length m:
$Pr(row max) = 1 - left( frac{H}{H + 1} right) ^{m}$Pretend there's latex rendering for that code block. What that basically says is that for example values of m:
m=4: 22.75%
m=8: 40.33%
m=16: 64.40%
m=22: 75.82%
m=32: 87.32%So at max worldsize, each row will have an 87% chance to contain an element of value (max = 15). Same case for the minima. Thus, a large fraction of rows will have that.
After a row-wise sort, this creates lots of ties at the edges, which means fewer inversions and fewer swaps for shakersort on those edge columns, because equal-valued elements don't need swapping. Any sorting algorithm should clear these such columns much faster.Just note that this only applies after we've sorted along one dimension already.
We can just barely make use of this property for parallelization. If our parallelization strategy makes use of the main drone as a worker itself (I call this strategy "short"), instead of using it as a task delegator, having the main drone work on the first or last column is useful because it will free up sooner and can deal with any logical administrative tasks immediately afterwards while slower drones take care of the middle of the field. I'm guessing it's probably only statistically significant above ws=22, and when it does apply, it'll probably be an extremely small speedup.
Additional Notes I probably won't update this function to use the new drone_task type syntax. This is already clean enough and works.
Improvement
There is an opening for improvement where finished drones can help any remaining drones sort their rows or columns. Since sorting a row is stateless w.r.t. cactus value, and since a swap is (I suspect) an atomic operation w.r.t. timing and the tick system, additional drones can work an individual shakersort on the same row w.r.t. variable environment. Drones update their own left and right limits on their own.
Requirements:
Make sure "long" drone mode works by picking out the first finished job and not the first started job.
Limit additional helper drones to 1. 100% boost across rows is more valuable than a 400% boost on one row.
Start a shake_row task from the field-middle outwards.
Code
# Strategy: double shakersort
# Uses parallelization via utils_drone.drone_AA_task
from utils_list import *
from utils_item import *
from utils_farm import *
from utils_move import *
from utils_drone import *
def shake_row():
# Shakersort the current row; uses goto_x to anchor & restore.
prev_x = get_pos_x()
goto_x(0)
left = 0
right = ws - 1
dir = East
while left < right:
if dir == East:
cx = get_pos_x()
loc_last_swap = left
while cx < right:
curr = measure()
nxt = measure(dir)
if curr > nxt:
swap(dir)
loc_last_swap = cx
move(dir)
cx += 1
right = loc_last_swap
dir = u_turn(dir)
if dir == West:
cx = get_pos_x()
loc_last_swap = right
while cx > left:
curr = measure()
nxt = measure(dir)
if curr < nxt:
swap(dir)
loc_last_swap = cx
move(dir)
cx -= 1
left = loc_last_swap
dir = u_turn(dir)
goto_x(prev_x)
return True
def shake_col():
# Shakersort the current column; uses goto_y to anchor & restore.
prev_y = get_pos_y()
goto_y(0)
left = 0
right = ws - 1
dir = North
while left < right:
if dir == North:
cy = get_pos_y()
loc_last_swap = left
while cy < right:
curr = measure()
nxt = measure(dir)
if curr > nxt:
swap(dir)
loc_last_swap = cy
move(dir)
cy += 1
right = loc_last_swap
dir = u_turn(dir)
if dir == South:
cy = get_pos_y()
loc_last_swap = right
while cy > left:
curr = measure()
nxt = measure(dir)
if curr < nxt:
swap(dir)
loc_last_swap = cy
move(dir)
cy -= 1
left = loc_last_swap
dir = u_turn(dir)
goto_y(prev_y)
return True
def _subtask_plant_eastwards():
# Plant one full row, moving east ws steps (wraps back to start col)
for _ in range(ws):
plant(Entities.Cactus)
move(East)
return True
def plant_cacti():
# """Plant cactus on every tile; parallelism handled inside drone_AA_task."""
recompute_parallelism()
if not can_pay_for(Entities.Cactus, ws * ws):
print("Cannot pay for Cactus, ws*ws")
return False
return drone_naive_AA_task(_subtask_plant_eastwards, North)
def maintask():
#"""Plant, row shakersort sweep, column shakersort sweep, harvest."""
recompute_parallelism()
if not plant_cacti():
return False
# Shaker rows (ws passes north)
if not drone_naive_AA_task(shake_row, North):
return False
goto((0, 0))
# Shaker cols (ws passes east)
if not drone_naive_AA_task(shake_col, East):
return False
goto((0, 0))
harvest()
return True
if __name__ == "__main__":
clear()
till_all()
while maintask():
continue
Cactus (TODO Parallelization Test Improvement)
Test script
Here is code to empirically test my hypothesis that multiple shakersort task instances on one row will not interfere with each other due to three properties:
How the non-greedy limit-updating functionality operates inside shakersort
Shakersort is stateless w.r.t. the cactus value the drone is currently "holding"
Cactus-swaps are atomic operations.
However I'm still investigating if there are any potential race conditions.
from utils_list import *
from utils_drone import *
from utils_grow import *
from utils_farm import *
from utils_move import *
from utils_item import *
if __name__ == "__main__":
clear()
for i in range(ws):
till()
plant(Entities.Cactus)
move(East)
def _factory(args_tuple):
def shake_row():
# Shakersort the current row; uses goto_x to anchor & restore.
prev_x = get_pos_x()
goto_x(0)
left = 0
right = ws - 1
dir = East
while left < right:
if dir == East:
cx = get_pos_x()
loc_last_swap = left
while cx < right:
curr = measure()
nxt = measure(dir)
if curr > nxt:
swap(dir)
loc_last_swap = cx
move(dir)
cx += 1
right = loc_last_swap
dir = u_turn(dir)
if dir == West:
cx = get_pos_x()
loc_last_swap = right
while cx > left:
curr = measure()
nxt = measure(dir)
if curr < nxt:
swap(dir)
loc_last_swap = cx
move(dir)
cx -= 1
left = loc_last_swap
dir = u_turn(dir)
goto_x(prev_x)
return True
return shake_row
args = []
# If swaps are truly atomic there
# will be no use for delaying.
# base_delay = 1000
base_delay = 0
for i in range(5):
args.append((i*base_delay, ))
drone_task(_factory, )
Cactus (Old)
Old Code for Shakersort (pre-parallelization era)
# Strategy: double shakersort
from utils_list import *
from utils_item import *
from utils_move import *
ws = get_world_size()
def till_all():
for i in range(ws):
for j in range(ws):
if get_ground_type() != Grounds.Soil:
till()
move(East)
move(North)
def plant_cacti():
if not can_pay_for(Entities.Cactus, ws*ws):
print("Cannot pay for Cactus, ws*ws")
return False
for i in range(ws):
for j in range(ws):
plant(Entities.Cactus)
move(East)
move(North)
return True
def shake_row():
# Shakersort the row.
goto_x(0)
left = 0
right = ws - 1
dir = East
while left < right:
if dir == East:
cx = get_pos_x()
loc_last_swap = left
while cx < right:
curr = measure()
next = measure(dir)
if curr > next:
swap(dir)
loc_last_swap = cx
move(dir)
cx += 1
right = loc_last_swap
dir = u_turn(dir)
if dir == West:
cx = get_pos_x()
loc_last_swap = right
while cx > left:
curr = measure()
next = measure(dir)
if curr < next:
swap(dir)
loc_last_swap = cx
move(dir)
cx -= 1
left = loc_last_swap
dir = u_turn(dir)
def shake_col():
# Shakersort the column.
goto_y(0)
left = 0
right = ws - 1
dir = North
while left < right:
if dir == North:
cy = get_pos_y()
loc_last_swap = left
while cy < right:
curr = measure()
next = measure(dir)
if curr > next:
swap(dir)
loc_last_swap = cy
move(dir)
cy += 1
right = loc_last_swap
dir = u_turn(dir)
if dir == South:
cy = get_pos_y()
loc_last_swap = right
while cy > left:
curr = measure()
next = measure(dir)
if curr < next:
swap(dir)
loc_last_swap = cy
move(dir)
cy -= 1
left = loc_last_swap
dir = u_turn(dir)
if __name__ == "__main__":
clear()
till_all()
while True:
if not plant_cacti():
break
for i in range(ws):
shake_row()
move(North)
goto((0, 0))
for j in range(ws):
shake_col()
move(East)
goto((0, 0))
harvest()
Naive
Make two n**2 bubble passes, one rowwise and one colwise.
Sorry BylliGoat for the passing shade but watching the drone do massive passes on the 22x22 field while there were only two tiny pockets of unsorted cacti remaining made me kinda mildly annoyed. I guess it's my fault for choosing to use a n**2 solution in the first place instead of making my own. I got too lazy and just chose the first cactus steam guide I saw and didnt notice there was one right beside it that implemented gnomesort instead.
Sunflower (Parallelization, uses Factory)
The following two parallelized versions of my solution to the sunflower problem use an algorithm that manually finds the max sunflowers.
Additional implementation idea
I think that unironically, a naive parallelization approach that persists drones between bucket passes will be faster. Here is the overview:
These strategies below work better with "short" parallelization strategy because each subtask is uniform in length and there is little usage respawning drones while the overall job is underway.
If ws = max_drones():
- Spawn drones on a column and wait for them to line up and wait for all flowers to grow.
- Make 8 passes (for each petal value) using each drone for its row, at the same time.
- As soon as one pass is done you can immediately proceed to the next petal value and make a pass. My algorithm needs to respawn drones, which is inefficient, but it looks funny.
if ws != max_drones():
- Plant the field. No need to keep track.
- Spawn drones and arrange their starting_positions as 1d board indices spaced according to pass_length = ws // max_drones() + 1.
- For each drone, traverse the field indices (1d) by pass_length times using goto(deserialize_2d_index(field_idx += 1)). Then harvest any sunflowers of level 15.
- Repeat passes for decreasing petal values.
I leave this as an exercise to the reader because I'm too lazy
Notes
After figuring out a suitable drone_task function that can take **args in a convoluted way, I revised my first parallelization attempt into the following code.
Noting the factory version subtasks, they are not mathematically closed w.r.t. starting position because the location in which drones are spawned does not matter:
The overall task time will be the same no matter the spawning location. This is because the game board is a torus / infinite tiling, and there will always exist a "worst distance" of worldsize taxicab dist away from your current position. I return to (0, 0) after the overall task anyways.
The subtask, defined as "going to and harvesting at positions defined in a queue" orchestrated per petal value, does not benefit from being closed due to the above point; no matter the spawning point of drones for the next petal value, there will always exist a worst distance. In fact they benifit because subtask drones end their process faster due to not requiring a return to original position.
p_bucket_sf_factory
# Version that uses the drone_task function I wrote in utils_drone.
from utils_list import *
from utils_item import *
from utils_move import *
from utils_drone import *
from utils_farm import *
BUCKET_MIN = 7
BUCKET_MAX = 15
NUM_BUCKETS = BUCKET_MAX - BUCKET_MIN + 1
def b_idx(v):
return v - BUCKET_MIN
def _factory_plant_row(args_tuple):
# args_tuple: (row_index,)
row_index = args_tuple[0]
def job():
start = get_pos()
goto((0, row_index))
record = []
for _ in range(ws):
plant(Entities.Sunflower)
record.append(measure())
move(East)
goto(start)
return record
return job
def plant_sf():
recompute_parallelism()
args = []
i = 0
while i < ws:
args.append((i,))
i += 1
results = drone_task(_factory_plant_row, args)
if results == None:
return False
field = []
i = 0
while i < ws:
field.append(results[i])
i += 1
return field
def _factory_harvest_worker(args_tuple):
# args_tuple: (positions_list,)
positions = args_tuple[0]
def worker():
q_to_visit = positions
head = 0
while head < len(q_to_visit):
next = q_to_visit[head]
head += 1
goto(deserialize_2d_index(next, ws))
if get_entity_type() == Entities.Sunflower:
if can_harvest():
harvest()
else:
q_to_visit.append(next)
return True
return worker
def harvest_sf(buckets):
recompute_parallelism()
# SERIAL (no parallelism)
if parallelism_type == "none":
v = BUCKET_MAX
while v >= BUCKET_MIN:
k = b_idx(v)
cbucket = buckets[k]
if len(cbucket) > 0:
_factory_harvest_worker((cbucket))()
v -= 1
return True
# PARALLEL (short/long)
v = BUCKET_MAX
while v >= BUCKET_MIN:
k = b_idx(v)
cbucket = buckets[k]
if len(cbucket) > 0:
workers = max_drones()
if workers < 1:
workers = 1
if parallelism_type == "long" and workers > 1:
workers -= 1
# Use unsafe matrix split for spatial locality
# TODO: I need a better heuristic for
# locality split.
parts = deserialize_2d_matrix_unsafe_CC(cbucket, workers)
args = []
i = 0
while i < len(parts):
if len(parts[i]) > 0:
args.append((parts[i],))
i += 1
if len(args) > 0:
results = drone_task(_factory_harvest_worker, args)
if results == None:
return False
i = 0
while i < len(results):
if not results[i]:
return False
i += 1
v -= 1
return True
def calculate_buckets_from_field_2d(field_2d):
buckets = []
for _ in range(NUM_BUCKETS):
buckets.append([])
field = serialize_2d_matrix(field_2d)
for i in range(len(field)):
buckets[b_idx(field[i])].append(i)
return buckets
if __name__ == "__main__":
clear()
till_all()
while True:
goto((0, 0))
field_2d = plant_sf()
if not field_2d:
break
buckets = calculate_buckets_from_field_2d(field_2d)
if not harvest_sf(buckets):
break
Sunflower (Parallelization, doesn't use Factory)
You should note that funnily enough, I wrote this version before even considering the naive version of parallelization, which would be to spawn drones along a column, and have them traverse each row and harvest the maximum (15), and then repeat that for each sunflower petal value in decreasing order.
... Am I actually the naive one here?
In addition, I also wrote this before I figured out my parallelization factory function strategy to take care of inputs into subtasks.
p_bucket_sf
from utils_list import *
from utils_item import *
from utils_move import *
from utils_drone import *
# TODO figure out a way to persist drones through buckets while
# letting them wait for other drones to finish their portion of bucket.
# The overhead is quite minimal, but we can still save time there.
BUCKET_MIN = 7
BUCKET_MAX = 15
NUM_BUCKETS = BUCKET_MAX - BUCKET_MIN + 1
def b_idx(v):
return v - BUCKET_MIN
def _subtask_till_eastwards():
# Till one full row, moving east ws steps (wraps back to start col)
for _ in range(ws):
if get_ground_type() != Grounds.Soil:
till()
move(East)
return True
def till_all():
# """Till every row once; parallelism handled inside drone_AA_task."""
recompute_parallelism()
return drone_naive_AA_task(_subtask_till_eastwards, North)
def _subtask_plant_sf_eastwards():
record = []
for _ in range(ws):
plant(Entities.Sunflower)
record.append(measure())
move(East)
return record
def plant_sf():
# TODO: Take care of non-bool outputs. Here is custom version
# for the simple usecase of sunflowers.
recompute_parallelism()
dir = North
length = ws
field = []# 2d matrix
for _ in range(length):
field.append([])
_drones = []# collection of <drone> objects
_drones_idx = [] # index w.r.t. field
if parallelism_type == "none":
# Does not account for drone parallelism.
for i in range(length):
res = _subtask_plant_sf_eastwards()
if not res:
return False
field[i] = res
move(dir)
return field
elif parallelism_type == "short":
# Uses the main drone as a worker.
for i in range(length):
subtask_result_watcher = spawn_drone(_subtask_plant_sf_eastwards)
if subtask_result_watcher:
# Lists don't support inserting at uninitialized indices.
# Fix with dict.
_drones.append(subtask_result_watcher)
_drones_idx.append(i)
else:
res = _subtask_plant_sf_eastwards()
if not res:
return False
field[i] = res# Directly insert into field. there is no
# <drone> to wait for.
move(dir)
while _drones_idx:
idx = _drones_idx.pop()
res = _drones.pop()
res = wait_for(res)
if not res:
return False
field[idx] = res
return field
elif parallelism_type == "long":
counter, limit = 0, length
while counter < limit:
if len(_drones) < max_drones():
subtask_result_watcher = spawn_drone(_subtask_plant_sf_eastwards)
if subtask_result_watcher:
_drones.insert(0, subtask_result_watcher)
_drones_idx.insert(0, counter)
counter += 1
move(dir)
else:
# n_drones under capacity but spawn_drone() failed.
quick_print("Error: unexpected spawn_drone() failure")
return False
if len(_drones) >= max_drones():
idx = _drones_idx.pop()# TODO detect the first
# exited drone instead of first-job-started drone
res = _drones.pop()
res = wait_for(res)
if not res:
return False
field[idx] = res
while _drones:
idx = _drones_idx.pop()
res = _drones.pop()
res = wait_for(res)
if not res:
return False
field[idx] = res
return field
else:
print("Error: Unknown parallelism type:", parallelism_type)
return False
def _factory_harvest_worker(positions):
# This function constructs zero-arg functions for use in spawn_drone.
# For more information, look up the Factory coding design pattern.
# Args:
# positions: list of serialized 2d indices.
# Returns:
# function: zero-arg ready-to-deploy worker
def worker():
q_to_visit = positions
head = 0
while head < len(q_to_visit):
next = positions[head]
head += 1
goto(deserialize_2d_index(next, ws))
if get_entity_type() == Entities.Sunflower:
if can_harvest():
harvest()
else:
q_to_visit.append(next)# requeue: still growing
return True
return worker
def harvest_sf(buckets):
# Consider variable buckets destroyed/unusable after use.
recompute_parallelism()
_drones = []
if parallelism_type == "none":
v = BUCKET_MAX
while v >= BUCKET_MIN:
k = b_idx(v)
cbucket = buckets[k]
if len(cbucket) > 0:
_factory_harvest_worker(cbucket)()
v -= 1
return True
elif parallelism_type == "short":
cbucket_counter = BUCKET_MAX
while cbucket_counter >= BUCKET_MIN:
if len(buckets[b_idx(cbucket_counter)]):
# Partitions should contain locally close
# positions according to deserialization logic.
partition = deserialize_2d_matrix_unsafe_CC(buckets[b_idx(cbucket_counter)], max_drones())
for i in range(len(partition)):
positions = partition[i]
subtask_watcher = spawn_drone(_factory_harvest_worker(positions))
if subtask_watcher:
_drones.append(subtask_watcher)
else:
if not _factory_harvest_worker(positions)():
return False
while _drones:
if not wait_for(_drones.pop()):
return False
else:
# Nothing in this bucket.
pass
cbucket_counter -= 1
return True
elif parallelism_type == "long":
cbucket_counter = BUCKET_MAX
while cbucket_counter >= BUCKET_MIN:
if len(buckets[b_idx(cbucket_counter)]):
# Partitions should contain locally close
# positions according to deserialization logic.
partition = deserialize_2d_matrix_unsafe_CC(buckets[b_idx(cbucket_counter)], max_drones() - 1)
counter, limit = 0, len(partition)
while counter < limit:
positions = partition[counter]
if len(_drones) < max_drones():
subtask_watcher = spawn_drone(_factory_harvest_worker(positions))
if subtask_watcher:
_drones.insert(0, subtask_watcher)
counter += 1
else:
# n_drones under capacity but spawn_drone() failed.
quick_print("Error: unexpected spawn_drone() failure")
return False
if len(_drones) >= max_drones():
if not wait_for(_drones.pop()):
return False
while _drones:
if not wait_for(_drones.pop()):
return False
else:
# Nothing in this bucket.
pass
cbucket_counter -= 1
return True
else:
print("Error: Unknown parallelism type:", parallelism_type)
return False
def calculate_buckets_from_field_2d(field_2d):
buckets = []
for _ in range(NUM_BUCKETS):
buckets.append([])
field = serialize_2d_matrix(field_2d)
for i in range(len(field)):
buckets[b_idx(field[i])].append(i)
return buckets
if __name__ == "__main__":
clear()
till_all()
while True:
goto((0, 0))
field_2d = plant_sf()
buckets = calculate_buckets_from_field_2d(field_2d)
harvest_sf(buckets)
Sunflower (Old)
Uses a bucket system to keep track of sunflowers of various petal number values.
The corner case only applies usually to value 15 sunflowers as they are still growing while the drone switches into harvesting mode. It starts backwards due to .pop() and there will be a little overhead.
bucket_sunflowers
from utils_list import *
from utils_move import *
BUCKET_MIN = 7
BUCKET_MAX = 15
NB = BUCKET_MAX - BUCKET_MIN + 1
def bidx(v):
return v - BUCKET_MIN
clear()
ws = get_world_size()
# Need ♥♥♥♥♥♥ list comprehension
# buckets = [[] for _ in range(NB)]
buckets = []
for _ in range(NB):
buckets.append([])
remaining = 0
def replant_sf():
global buckets
global remaining
for k in range(NB):
buckets[k] = []
remaining = 0
goto((0, 0))
for i in range(ws):
for j in range(ws):
if get_ground_type() != Grounds.Soil:
till()
plant(Entities.Sunflower)
pos = serialize_2d_index((j, i), ws)
v = measure()
buckets[bidx(v)].append(pos)
remaining += 1
move(East)
move(North)
while True:
replant_sf()
while remaining > 0:
progressed = False
# Descend from 15 -> 7, drain each bucket this pass.
for v in range(BUCKET_MAX, BUCKET_MIN - 1, -1):
k = bidx(v)
b = buckets[k]
while b:
pos = b.pop()
goto(deserialize_2d_index(pos, ws))
if can_harvest():
harvest()
remaining -= 1
progressed = True
else:
# Corner-case: push deferred item to
# front of the bucket
# (acceptable O(n) cost, small window).
buckets[k].insert(0, pos)
naive_sunflowers
Bad code that uses an expensive max-finding function.
from utils_list import *
from utils_move import *
clear()
ws = get_world_size()
arr_sf = list(range(ws * ws)) # random
def replant_sf():
global arr_sf
goto((0, 0))
for i in range(ws):
for j in range(ws):
if get_ground_type() != Grounds.Soil:
till()
plant(Entities.Sunflower)
arr_sf[serialize_2d_index((j, i), ws)] = measure()
move(East)
move(North)
replant_sf()
while True:
for _ in range(ws * ws):
val, idx = max_with_index(arr_sf)
goto(deserialize_2d_index(idx, ws))
while not can_harvest():
pass
harvest()
cx = get_pos_x()
cy = get_pos_y()
arr_sf[serialize_2d_index((cx, cy), ws)] = 0
replant_sf()
Maze (Parallelization Cheese)
I have implemented the hilarious cheese strategy detailed in the below guide:
https://steamcommunity.com/sharedfiles/filedetails/?id=3588071660
This strategy makes use of the fact that we have more drones available that there are possible maze tiles, for certain combos of worldsize and max_drones. We can cheese the maze-solving problem since we are not constrained to using one drone at a time. We can check every tile in the maze immediately using many drones at once, instead of expensively traversing it using a single drone.
In light of this new code I've made, I also added a flag to my onecrop polyculture code to enable farming Weird Substance.
# Consumes approximately 25k Weird Substance per second.
# Might have a bug with replanting a separate maze
# while the current 300-maze run is in progress. I suspect
# is due to THRESH_SOLVED but since there is barely any
# overhead, and because gold scales linearly with maze
# regeneration, I will ignore this bug for now.
from utils_list import *
from utils_drone import *
from utils_grow import *
from utils_farm import *
from utils_move import *
from utils_item import *
MANUAL_WORLD_SIZE = 5# Don't change this.
THRESH_SOLVED = 3# Consider 300 mazes over when applying
# this number of weird substance without gold location changing.
THRESH_NO_MAZE = 50# Number of times to check for lack of maze to
# consider the _subtask_worker job finished.
COST_MAZE = MANUAL_WORLD_SIZE * 2**(num_unlocked(Unlocks.Mazes) - 1)
def stop_condition():
return num_items(Items.Gold) > 100000000
def _factory_maze(args_tuple):
# args_tuple: (start_pos_1d,)
# If the start pos is outside of the board, consider it a maze_starter worker.
start_pos_1d = args_tuple[0]
if start_pos_1d >= MANUAL_WORLD_SIZE**2:
def _subtask_maze_starter():
# optionally delay here?
while True:
if stop_condition():
break
if get_entity_type() not in (Entities.Hedge, Entities.Treasure):
plant(Entities.Bush)
use_item(Items.Weird_Substance, COST_MAZE)
return True
return _subtask_maze_starter
else:
start_pos = deserialize_2d_index(start_pos_1d, MANUAL_WORLD_SIZE)
def _subtask_worker():
goto(start_pos)
while get_entity_type() not in (Entities.Hedge, Entities.Treasure):
pass# Wait for maze to be initialized
while True:
counter = 0
if stop_condition():
break
while get_entity_type() == Entities.Treasure:
ploc_treasure = get_pos()
use_item(Items.Weird_Substance, COST_MAZE)
cloc_treasure = measure()
if ploc_treasure == cloc_treasure:
counter += 1
if counter >= THRESH_SOLVED:
harvest()
while get_entity_type() not in (Entities.Hedge, Entities.Treasure):
# A worker drone has finished and harvested the maze
counter += 1
if counter >= THRESH_NO_MAZE:
return True
return True
return _subtask_worker
def main():
total_tiles = MANUAL_WORLD_SIZE**2
drone_cap = max_drones_safe()
if total_tiles > drone_cap:
quick_print("Exception: Cannot cover the board with drones.")
return
if not check_has_item(Items.Weird_Substance, COST_MAZE * 300):
quick_print("Exception: You don't have enough Weird Substance")
return
if stop_condition():
quick_print("Stop condition already met.")
return
clear()
set_world_size(MANUAL_WORLD_SIZE)
args = []
for i in range(MANUAL_WORLD_SIZE**2 + 1):
args.append((i,))
drone_task(_factory_maze, args)
if __name__ == "__main__":
main()# Using this notation because I need to raise exception
# JIC, then return None
Other
Maze Naive Search or Maze Parallelization Search
Unfortunately, I don't have a naive mazerunner for you. When I was playing the early game I had max multiplier on maze output becuase I farmer Weird Substance, and I just took a random BFS/DFS script off someone else while I coded by sunflower (parallelization) code. Here is the source I used:
https://steamcommunity.com/sharedfiles/filedetails/?id=3408470559
Dino
Unfortunately there is no parallelization for farming bones since only one drone can wear the dino hat. You'll need a better dino script that can compute shortcuts between a given optimal snake path, because I used someone else's script that tediously runs the same board-covering pattern for like 7 minutes per run while coding my shakersort parallelization modification. Here is the source I used:
https://steamcommunity.com/sharedfiles/filedetails/?id=3585506194
Misc
Stack Overflow
Quite classic.
def recur():
recur()
if __name__ == "__main__":
recur()
Wrong Order
Simply invert the sorting condition of the algorithm, from lessthan to greaterthan (and conversely, for the reverse pass of shakersort).
Fashion Show
Left as an exercise to the reader.
Infinite Loop
f0
from f1 import * f1
from f0 import * Then run either file.
Pet the Piggy
Come on who hasn't run this command the moment they saw it
Outro
Please share your code in a steam guide or discussion as well. I'm lazy and want drop-in code for maze-finding.
Also automation and meta-optimization.